diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index e17f987b5c67..15674d696cd6 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef; /// `InvertedIndexCreator` provides functionality to construct an inverted index #[async_trait] -pub trait InvertedIndexCreator { +pub trait InvertedIndexCreator: Send { /// Adds a value to the named index. A `None` value represents an absence of data (null) /// /// - `index_name`: Identifier for the index being built diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index 0dd2f7a5473b..24478d5e22d2 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -28,7 +28,7 @@ use crate::inverted_index::format::reader::InvertedIndexReader; /// avoiding repeated compilation of fixed predicates such as regex patterns. #[mockall::automock] #[async_trait] -pub trait IndexApplier { +pub trait IndexApplier: Send + Sync { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). async fn apply<'a>( diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index d057323083ae..1699c6880462 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -68,11 +68,19 @@ impl AccessLayer { /// Deletes a SST file with given file id. pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> { - let path = location::sst_file_path(&self.region_dir, file_id); + let sst_path = location::sst_file_path(&self.region_dir, file_id); self.object_store - .delete(&path) + .delete(&sst_path) .await - .context(DeleteSstSnafu { file_id }) + .context(DeleteSstSnafu { file_id })?; + + let index_path = location::index_file_path(&self.region_dir, file_id); + self.object_store + .delete(&index_path) + .await + .context(OpenDalSnafu) + // ignore error if index file not found for compatibility + .or_else(|e| e.is_object_not_found().then_some(()).ok_or(e)) } /// Returns a reader builder for specific `file`. @@ -88,7 +96,6 @@ impl AccessLayer { request: SstWriteRequest, write_opts: &WriteOptions, ) -> Result> { - let path = location::sst_file_path(&self.region_dir, request.file_id); let region_id = request.metadata.region_id; let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { @@ -100,7 +107,7 @@ impl AccessLayer { metadata: request.metadata, source: request.source, storage: request.storage, - upload_path: path, + region_dir: self.region_dir.clone(), remote_store: self.object_store.clone(), }, write_opts, @@ -108,7 +115,12 @@ impl AccessLayer { .await? } else { // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + let mut writer = ParquetWriter::new( + self.region_dir.clone(), + request.file_id, + request.metadata, + self.object_store.clone(), + ); writer.write_all(request.source, write_opts).await? }; diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 9775f3d79160..6d70afbe7b14 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -79,8 +79,12 @@ impl WriteCache { ) -> Result> { // TODO(yingwen): Write to the local store and then upload. // Now we write to the remote and ignore local cache. - let mut writer = - ParquetWriter::new(request.upload_path, request.metadata, request.remote_store); + let mut writer = ParquetWriter::new( + request.region_dir, + request.file_id, + request.metadata, + request.remote_store, + ); writer.write_all(request.source, write_opts).await } } @@ -91,8 +95,7 @@ pub struct SstUploadRequest { pub metadata: RegionMetadataRef, pub source: Source, pub storage: Option, - /// Path to upload the file. - pub upload_path: String, + pub region_dir: String, /// Remote object store to upload. pub remote_store: ObjectStore, } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0ddcec61d0f2..7ab0f55091f9 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; use snafu::ResultExt; @@ -53,9 +52,10 @@ pub struct CompactionRequest { pub(crate) file_purger: FilePurgerRef, /// Start time of compaction task. pub(crate) start_time: Instant, - /// Buffering threshold while writing SST files. - pub(crate) sst_write_buffer_size: ReadableSize, + pub(crate) cache_manager: CacheManagerRef, + + pub(crate) engine_config: Arc, } impl CompactionRequest { @@ -337,8 +337,8 @@ impl CompactionStatus { waiters: Vec::new(), file_purger: self.file_purger.clone(), start_time, - sst_write_buffer_size: engine_config.sst_write_buffer_size, cache_manager, + engine_config, }; if let Some(pending) = self.pending_compaction.take() { diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index fefae906ba69..1373049882de 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -35,6 +35,7 @@ pub fn new_file_handle( ), level, file_size: 0, + inverted_index_available: false, }, file_purger, ) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index e97030ac383c..780d965ffbba 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -17,7 +17,6 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; -use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; @@ -31,7 +30,9 @@ use crate::access_layer::{AccessLayerRef, SstWriteRequest}; use crate::cache::CacheManagerRef; use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; -use crate::error::{self, CompactRegionSnafu}; +use crate::config::MitoConfig; +use crate::error; +use crate::error::CompactRegionSnafu; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; @@ -41,7 +42,7 @@ use crate::request::{ }; use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; use crate::sst::file_purger::FilePurgerRef; -use crate::sst::parquet::WriteOptions; +use crate::sst::parquet::{InvertedIndexOptions, WriteOptions}; use crate::sst::version::LevelMeta; const MAX_PARALLEL_COMPACTION: usize = 8; @@ -128,8 +129,8 @@ impl Picker for TwcsPicker { waiters, file_purger, start_time, - sst_write_buffer_size, cache_manager, + engine_config, } = req; let region_metadata = current_version.metadata.clone(); @@ -177,7 +178,6 @@ impl Picker for TwcsPicker { sst_layer: access_layer, outputs, expired_ssts, - sst_write_buffer_size, compaction_time_window: Some(time_window_size), request_sender, waiters, @@ -185,6 +185,7 @@ impl Picker for TwcsPicker { start_time, cache_manager, storage: current_version.options.storage.clone(), + engine_config, }; Some(Box::new(task)) } @@ -238,7 +239,6 @@ pub(crate) struct TwcsCompactionTask { pub sst_layer: AccessLayerRef, pub outputs: Vec, pub expired_ssts: Vec, - pub sst_write_buffer_size: ReadableSize, pub compaction_time_window: Option, pub file_purger: FilePurgerRef, /// Request sender to notify the worker. @@ -250,6 +250,7 @@ pub(crate) struct TwcsCompactionTask { pub(crate) cache_manager: CacheManagerRef, /// Target storage of the region. pub(crate) storage: Option, + pub engine_config: Arc, } impl Debug for TwcsCompactionTask { @@ -299,8 +300,17 @@ impl TwcsCompactionTask { output.output_file_id ); + let index_config = &self.engine_config.inverted_index; + let inverted_index = + (!index_config.disable_creation_on_compact).then(|| InvertedIndexOptions { + creation_memory_usage_threshold: index_config + .creation_memory_usage_threshold + .map(|size| size.as_bytes() as _), + }); + let write_opts = WriteOptions { - write_buffer_size: self.sst_write_buffer_size, + write_buffer_size: self.engine_config.sst_write_buffer_size, + inverted_index, ..Default::default() }; let metadata = self.metadata.clone(); @@ -330,6 +340,7 @@ impl TwcsCompactionTask { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, + inverted_index_available: sst_info.inverted_index_available, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 0723c702ae70..80767ead78c5 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -89,6 +89,8 @@ pub struct MitoConfig { pub parallel_scan_channel_size: usize, /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + + pub inverted_index: InvertedIndexConfig, } impl Default for MitoConfig { @@ -113,6 +115,7 @@ impl Default for MitoConfig { scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, + inverted_index: InvertedIndexConfig::default(), } } } @@ -188,3 +191,20 @@ fn divide_num_cpus(divisor: usize) -> usize { (cores + divisor - 1) / divisor } + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct InvertedIndexConfig { + pub disable_creation_on_flush: bool, + pub disable_creation_on_compact: bool, + pub creation_memory_usage_threshold: Option, +} + +impl Default for InvertedIndexConfig { + fn default() -> Self { + InvertedIndexConfig { + disable_creation_on_flush: false, + disable_creation_on_compact: false, + creation_memory_usage_threshold: Some(ReadableSize::mb(128)), + } + } +} diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2e3dc4de0d53..ed8cd99e95da 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -553,5 +553,5 @@ async fn test_region_usage() { assert_eq!(region_stat.sst_usage, 2742); // region total usage - assert_eq!(region_stat.disk_usage(), 3748); + assert_eq!(region_stat.disk_usage(), 3780); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 10a1de0e5de2..ebcf173a5f88 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -41,7 +41,7 @@ use crate::request::{ use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; -use crate::sst::parquet::WriteOptions; +use crate::sst::parquet::{InvertedIndexOptions, WriteOptions}; use crate::worker::WorkerListener; /// Global write buffer (memtable) manager. @@ -294,8 +294,17 @@ impl RegionFlushTask { .with_label_values(&["flush_memtables"]) .start_timer(); + let inverted_index_config = &self.engine_config.inverted_index; + let inverted_index_options = + (!inverted_index_config.disable_creation_on_flush).then(|| InvertedIndexOptions { + creation_memory_usage_threshold: inverted_index_config + .creation_memory_usage_threshold + .map(|size| size.as_bytes() as _), + }); + let mut write_opts = WriteOptions { write_buffer_size: self.engine_config.sst_write_buffer_size, + inverted_index: inverted_index_options, ..Default::default() }; if let Some(row_group_size) = self.row_group_size { @@ -339,6 +348,7 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, + inverted_index_available: sst_info.inverted_index_available, }; file_metas.push(file_meta); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index c28f6cd6d598..805bb1c9914a 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -171,6 +171,7 @@ async fn checkpoint_with_different_compression_types() { time_range: (0.into(), 10000000.into()), level: 0, file_size: 1024000, + inverted_index_available: false, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index eb84d6d59599..7ca316e622b9 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -121,6 +121,9 @@ lazy_static! { /// Counter of filtered rows during merge. pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap(); + /// Counter of row groups read. + pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec = + register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. // Cache related metrics. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4a8c7028357b..f3de2c64e3ae 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,8 +14,10 @@ //! Scans a region according to the scan request. +use std::sync::Arc; + use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; @@ -27,6 +29,8 @@ use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::applier::SstIndexApplierRef; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -194,6 +198,7 @@ impl ScanRegion { total_ssts ); + let index_applier = self.build_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { @@ -204,6 +209,7 @@ impl ScanRegion { let seq_scan = SeqScan::new(self.access_layer.clone(), mapper) .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) + .with_index_applier(index_applier) .with_memtables(memtables) .with_files(files) .with_cache(self.cache_manager) @@ -224,6 +230,24 @@ impl ScanRegion { TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) .build() } + + /// Use the latest schema to build the index applier. + /// + /// To use this fixed schema to apply to different versions of SSTs, we have to make sure: + /// 1. Type of column cannot be changed. + /// 2. Column cannot be renamed. + fn build_index_applier(&self) -> Option { + SstIndexApplierBuilder::new( + self.access_layer.region_dir().to_string(), + self.access_layer.object_store().clone(), + self.version.metadata.as_ref(), + ) + .build(&self.request.filters) + .inspect_err(|err| warn!(err; "Failed to build index applier")) + .ok() + .flatten() + .map(Arc::new) + } } /// Config for parallel scan. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index f963568cad59..5a641f89cab5 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -39,6 +39,7 @@ use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanParallism; use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source}; use crate::sst::file::FileHandle; +use crate::sst::index::applier::SstIndexApplierRef; /// Scans a region and returns rows in a sorted sequence. /// @@ -62,6 +63,8 @@ pub struct SeqScan { ignore_file_not_found: bool, /// Parallelism to scan data. parallelism: ScanParallism, + + index_appiler: Option, } impl SeqScan { @@ -73,6 +76,7 @@ impl SeqScan { mapper: Arc::new(mapper), time_range: None, predicate: None, + index_appiler: None, memtables: Vec::new(), files: Vec::new(), cache_manager: None, @@ -95,6 +99,12 @@ impl SeqScan { self } + #[must_use] + pub(crate) fn with_index_applier(mut self, index_applier: Option) -> Self { + self.index_appiler = index_applier; + self + } + /// Sets memtables to read. #[must_use] pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { @@ -210,6 +220,7 @@ impl SeqScan { .access_layer .read_sst(file.clone()) .predicate(self.predicate.clone()) + .index_applier(self.index_appiler.clone()) .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index d32133d1bcc6..59a1ed290879 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -95,6 +95,8 @@ pub struct FileMeta { pub level: Level, /// Size of the file. pub file_size: u64, + /// Whether inverted index is available. + pub inverted_index_available: bool, } /// Handle to a SST file. @@ -236,6 +238,7 @@ mod tests { time_range: FileTimeRange::default(), level, file_size: 0, + inverted_index_available: false, } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 059b1956d7e2..95d631d7d8b9 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -137,6 +137,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 4096, + inverted_index_available: false, }, file_purger, ); diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 0efc3f8e6ad0..4e7279ab2f11 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -15,6 +15,7 @@ pub mod builder; use std::collections::BTreeSet; +use std::sync::Arc; use futures::{AsyncRead, AsyncSeek}; use index::inverted_index::format::reader::InvertedIndexBlobReader; @@ -52,6 +53,8 @@ pub struct SstIndexApplier { index_applier: Box, } +pub type SstIndexApplierRef = Arc; + impl SstIndexApplier { /// Creates a new [`SstIndexApplier`]. pub fn new( diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index fe328f16b121..978e84800e71 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -44,6 +44,8 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Inverted index options. If it's None, inverted index will not be created. + pub inverted_index: Option, } impl Default for WriteOptions { @@ -51,10 +53,18 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + inverted_index: Some(InvertedIndexOptions::default()), } } } +#[derive(Debug, Default)] +pub struct InvertedIndexOptions { + /// The memory usage threshold for inverted index creation. + /// Set to non-none value to enable external sort during inverted index creation + pub creation_memory_usage_threshold: Option, +} + /// Parquet SST info returned by the writer. pub struct SstInfo { /// Time range of the SST. @@ -65,6 +75,8 @@ pub struct SstInfo { pub num_rows: usize, /// File Meta Data pub file_metadata: Option>, + /// Whether inverted index is available. + pub inverted_index_available: bool, } #[cfg(test)] @@ -103,7 +115,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -116,7 +127,12 @@ mod tests { ..Default::default() }; - let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone()); + let mut writer = ParquetWriter::new( + FILE_DIR.to_string(), + handle.file_id(), + metadata, + object_store.clone(), + ); let info = writer .write_all(source, &write_opts) .await @@ -152,7 +168,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -165,7 +180,12 @@ mod tests { ..Default::default() }; // Prepare data. - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + FILE_DIR.to_string(), + handle.file_id(), + metadata.clone(), + object_store.clone(), + ); writer .write_all(source, &write_opts) .await @@ -220,7 +240,6 @@ mod tests { let mut env = crate::test_util::TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -234,12 +253,18 @@ mod tests { // write the sst file and get sst info // sst info contains the parquet metadata, which is converted from FileMetaData - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + FILE_DIR.to_string(), + handle.file_id(), + metadata.clone(), + object_store.clone(), + ); let sst_info = writer .write_all(source, &write_opts) .await .unwrap() .expect("write_all should return sst info"); + assert!(sst_info.inverted_index_available); let writer_metadata = sst_info.file_metadata.unwrap(); // read the sst file metadata diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 60729c664283..15592a2626ee 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,12 +14,12 @@ //! Parquet reader. -use std::collections::{HashSet, VecDeque}; +use std::collections::{BTreeSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; @@ -39,9 +39,10 @@ use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; -use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; +use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -64,6 +65,8 @@ pub struct ParquetReaderBuilder { projection: Option>, /// Manager that caches SST data. cache_manager: Option, + + index_applier: Option, } impl ParquetReaderBuilder { @@ -81,6 +84,7 @@ impl ParquetReaderBuilder { time_range: None, projection: None, cache_manager: None, + index_applier: None, } } @@ -110,6 +114,11 @@ impl ParquetReaderBuilder { self } + pub fn index_applier(mut self, index_applier: Option) -> Self { + self.index_applier = index_applier; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. @@ -129,35 +138,8 @@ impl ParquetReaderBuilder { // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; - // Computes column ids to read. - let column_ids: HashSet<_> = self - .projection - .as_ref() - .map(|p| p.iter().cloned().collect()) - .unwrap_or_else(|| { - region_meta - .column_metadatas - .iter() - .map(|c| c.column_id) - .collect() - }); let read_format = ReadFormat::new(Arc::new(region_meta)); - // Prunes row groups by metadata. - let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate { - let stats = - RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids); - - predicate - .prune_with_stats(&stats, read_format.metadata().schema.arrow_schema()) - .into_iter() - .enumerate() - .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) - .collect() - } else { - (0..parquet_meta.num_row_groups()).collect() - }; - // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); let projection_mask = if let Some(column_ids) = self.projection.as_ref() { @@ -174,6 +156,13 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; + let mut metrics = Metrics::default(); + + // Computes row groups to read. + let row_groups = self + .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) + .await; + let reader_builder = RowGroupReaderBuilder { file_handle: self.file_handle.clone(), file_path, @@ -184,12 +173,7 @@ impl ParquetReaderBuilder { cache_manager: self.cache_manager.clone(), }; - let metrics = Metrics { - read_row_groups: row_groups.len(), - build_cost: start.elapsed(), - ..Default::default() - }; - + metrics.build_cost = start.elapsed(); Ok(ParquetReader { row_groups, read_format, @@ -256,13 +240,67 @@ impl ParquetReaderBuilder { Ok(metadata) } + + /// Computes row groups to read. + async fn row_groups_to_read( + &self, + read_format: &ReadFormat, + parquet_meta: &ParquetMetaData, + metrics: &mut Metrics, + ) -> BTreeSet { + let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect(); + metrics.num_row_groups_unfiltered += row_group_ids.len(); + + // Applies index to prune row groups. + if let Some(index_applier) = &self.index_applier { + if self.file_handle.meta().inverted_index_available { + match index_applier.apply(self.file_handle.file_id()).await { + Ok(row_groups) => row_group_ids = row_groups, + Err(err) => warn!(err; "Failed to apply index"), + } + } + } + metrics.num_row_groups_inverted_index_filtered += row_group_ids.len(); + + // Prunes row groups by metadata. + if let Some(predicate) = &self.predicate { + let region_meta = read_format.metadata(); + let column_ids = match &self.projection { + Some(ids) => ids.iter().cloned().collect(), + None => region_meta + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(), + }; + + let row_groups = parquet_meta.row_groups(); + let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids); + let row_groups_to_prune = predicate + .prune_with_stats(&stats, region_meta.schema.arrow_schema()) + .into_iter() + .enumerate() + .filter_map(|(id, remain)| (!remain).then_some(id)); + + for row_group_id in row_groups_to_prune { + row_group_ids.remove(&row_group_id); + } + }; + metrics.num_row_groups_min_max_filtered += row_group_ids.len(); + + row_group_ids + } } /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { - /// Number of row groups to read. - read_row_groups: usize, + /// Number of unfiltered row groups. + num_row_groups_unfiltered: usize, + /// Number of row groups to read after filtering by inverted index. + num_row_groups_inverted_index_filtered: usize, + /// Number of row groups to read after filtering by min-max index. + num_row_groups_min_max_filtered: usize, /// Duration to build the parquet reader. build_cost: Duration, /// Duration to scan the reader. @@ -337,7 +375,7 @@ impl RowGroupReaderBuilder { /// Parquet batch reader to read our SST format. pub struct ParquetReader { /// Indices of row groups to read. - row_groups: VecDeque, + row_groups: BTreeSet, /// Helper to read record batches. /// /// Not `None` if [ParquetReader::stream] is not `None`. @@ -390,8 +428,8 @@ impl Drop for ParquetReader { self.reader_builder.file_handle.region_id(), self.reader_builder.file_handle.file_id(), self.reader_builder.file_handle.time_range(), - self.metrics.read_row_groups, - self.reader_builder.parquet_meta.num_row_groups(), + self.metrics.num_row_groups_min_max_filtered, + self.metrics.num_row_groups_unfiltered, self.metrics ); @@ -405,6 +443,15 @@ impl Drop for ParquetReader { READ_ROWS_TOTAL .with_label_values(&["parquet"]) .inc_by(self.metrics.num_rows as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["unfiltered"]) + .inc_by(self.metrics.num_row_groups_unfiltered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["min_max_filtered"]) + .inc_by(self.metrics.num_row_groups_min_max_filtered as u64); } } @@ -432,7 +479,7 @@ impl ParquetReader { } // No more items in current row group, reads next row group. - while let Some(row_group_idx) = self.row_groups.pop_front() { + while let Some(row_group_idx) = self.row_groups.pop_first() { let mut row_group_reader = self.reader_builder.build(row_group_idx).await?; let Some(record_batch) = row_group_reader diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 2ed226791dca..7ff0bb68bcd2 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,11 +14,13 @@ //! Parquet writer. +use std::num::NonZeroUsize; use std::sync::Arc; use common_datasource::file_format::parquet::BufferedWriter; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::Timestamp; +use futures::TryFutureExt; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; @@ -27,17 +29,23 @@ use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use store_api::storage::RegionId; use super::helper::parse_parquet_metadata; use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu}; use crate::read::{Batch, Source}; +use crate::sst::file::FileId; +use crate::sst::index::creator::SstIndexCreator; +use crate::sst::location; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; /// Parquet SST writer. pub struct ParquetWriter { - /// SST output file path. - file_path: String, + /// Directory of the region. + region_dir: String, + /// SST file id. + file_id: FileId, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, object_store: ObjectStore, @@ -46,12 +54,14 @@ pub struct ParquetWriter { impl ParquetWriter { /// Creates a new parquet SST writer. pub fn new( - file_path: String, + region_dir: String, + file_id: FileId, metadata: RegionMetadataRef, object_store: ObjectStore, ) -> ParquetWriter { ParquetWriter { - file_path, + region_dir, + file_id, metadata, object_store, } @@ -78,9 +88,10 @@ impl ParquetWriter { let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); + let file_path = location::sst_file_path(&self.region_dir, self.file_id); let write_format = WriteFormat::new(self.metadata.clone()); let mut buffered_writer = BufferedWriter::try_new( - self.file_path.clone(), + file_path.clone(), self.object_store.clone(), write_format.arrow_schema(), Some(writer_props), @@ -90,21 +101,30 @@ impl ParquetWriter { .context(WriteBufferSnafu)?; let mut stats = SourceStats::default(); - while let Some(batch) = source.next_batch().await? { - stats.update(&batch); - let arrow_batch = write_format.convert_batch(&batch)?; + let mut index = Indexer::new( + self.file_id, + self.region_dir.clone(), + &self.metadata, + self.object_store.clone(), + opts, + ); - buffered_writer - .write(&arrow_batch) - .await - .context(WriteBufferSnafu)?; + while let Some(batch) = write_next_batch(&mut source, &write_format, &mut buffered_writer) + .or_else(|err| async { + // abort index creation if error occurs. + index.abort().await; + Err(err) + }) + .await? + { + stats.update(&batch); + index.update(&batch).await; } + let inverted_index_available = index.finish().await; + if stats.num_rows == 0 { - debug!( - "No data written, try to stop the writer: {}", - self.file_path - ); + debug!("No data written, try to stop the writer: {file_path}"); buffered_writer.close().await.context(WriteBufferSnafu)?; return Ok(None); @@ -124,6 +144,7 @@ impl ParquetWriter { file_size, num_rows: stats.num_rows, file_metadata: Some(Arc::new(parquet_metadata)), + inverted_index_available, })) } @@ -147,6 +168,24 @@ impl ParquetWriter { } } +async fn write_next_batch( + source: &mut Source, + write_format: &WriteFormat, + buffered_writer: &mut BufferedWriter, +) -> Result> { + let Some(batch) = source.next_batch().await? else { + return Ok(None); + }; + + let arrow_batch = write_format.convert_batch(&batch)?; + buffered_writer + .write(&arrow_batch) + .await + .context(WriteBufferSnafu)?; + + Ok(Some(batch)) +} + #[derive(Default)] struct SourceStats { /// Number of rows fetched. @@ -175,3 +214,109 @@ impl SourceStats { } } } + +#[derive(Default)] +struct Indexer { + file_id: FileId, + region_id: RegionId, + inner: Option, +} + +impl Indexer { + fn new( + file_id: FileId, + region_dir: String, + metadata: &RegionMetadataRef, + object_store: ObjectStore, + opts: &WriteOptions, + ) -> Self { + let Some(option) = &opts.inverted_index else { + debug!( + "Skip creating index due to config, region_id: {}, file_id: {}", + metadata.region_id, file_id, + ); + return Self::default(); + }; + + if metadata.primary_key.is_empty() { + debug!( + "No tag columns, skip creating index, region_id: {}, file_id: {}", + metadata.region_id, file_id, + ); + return Self::default(); + } + + let Some(row_group_size) = NonZeroUsize::new(opts.row_group_size) else { + warn!( + "Row group size is 0, skip creating index, region_id: {}, file_id: {}", + metadata.region_id, file_id, + ); + return Self::default(); + }; + + let creator = SstIndexCreator::new( + region_dir.clone(), + file_id, + metadata, + object_store.clone(), + object_store, + option.creation_memory_usage_threshold, + row_group_size, + ); + + Self { + file_id, + region_id: metadata.region_id, + inner: Some(creator), + } + } + + async fn update(&mut self, batch: &Batch) { + if let Some(creator) = self.inner.as_mut() { + if let Err(err) = creator.update(batch).await { + warn!( + err; "Failed to update index, skip creating index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + + // Skip index creation if error occurs. + self.inner = None; + } + } + } + + async fn finish(mut self) -> bool { + if let Some(creator) = self.inner.as_mut() { + match creator.finish().await { + Ok((row_count, byte_count)) => { + debug!( + "Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + return true; + } + Err(err) => { + warn!( + err; "Failed to create index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } + } + + false + } + + async fn abort(&mut self) { + if let Some(creator) = self.inner.as_mut() { + if let Err(err) = creator.abort().await { + warn!( + err; "Failed to abort index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + self.inner = None; + } + } +} diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 3638d119faa1..677eb00a3fc5 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -106,6 +106,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ), level: 0, file_size: 0, + inverted_index_available: false, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index e480b1f146df..be0dbd049b7c 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -96,6 +96,7 @@ impl VersionControlBuilder { ), level: 0, file_size: 0, // We don't care file size. + inverted_index_available: false, }, ); self @@ -136,6 +137,7 @@ pub(crate) fn apply_edit( ), level: 0, file_size: 0, // We don't care file size. + inverted_index_available: false, } }) .collect(); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 351fa7cd0b95..057036e6d83c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -739,6 +739,11 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false +[datanode.region_engine.mito.inverted_index] +disable_creation_on_flush = false +disable_creation_on_compact = false +creation_memory_usage_threshold = "128MiB" + [[datanode.region_engine]] [datanode.region_engine.file]