diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs
index e17f987b5c67..15674d696cd6 100644
--- a/src/index/src/inverted_index/create.rs
+++ b/src/index/src/inverted_index/create.rs
@@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;
/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
-pub trait InvertedIndexCreator {
+pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs
index 0dd2f7a5473b..24478d5e22d2 100644
--- a/src/index/src/inverted_index/search/index_apply.rs
+++ b/src/index/src/inverted_index/search/index_apply.rs
@@ -28,7 +28,7 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
-pub trait IndexApplier {
+pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply<'a>(
diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs
index d057323083ae..1699c6880462 100644
--- a/src/mito2/src/access_layer.rs
+++ b/src/mito2/src/access_layer.rs
@@ -68,11 +68,19 @@ impl AccessLayer {
/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
- let path = location::sst_file_path(&self.region_dir, file_id);
+ let sst_path = location::sst_file_path(&self.region_dir, file_id);
self.object_store
- .delete(&path)
+ .delete(&sst_path)
.await
- .context(DeleteSstSnafu { file_id })
+ .context(DeleteSstSnafu { file_id })?;
+
+ let index_path = location::index_file_path(&self.region_dir, file_id);
+ self.object_store
+ .delete(&index_path)
+ .await
+ .context(OpenDalSnafu)
+ // ignore error if index file not found for compatibility
+ .or_else(|e| e.is_object_not_found().then_some(()).ok_or(e))
}
/// Returns a reader builder for specific `file`.
@@ -88,7 +96,6 @@ impl AccessLayer {
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result> {
- let path = location::sst_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
@@ -100,7 +107,7 @@ impl AccessLayer {
metadata: request.metadata,
source: request.source,
storage: request.storage,
- upload_path: path,
+ region_dir: self.region_dir.clone(),
remote_store: self.object_store.clone(),
},
write_opts,
@@ -108,7 +115,12 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
- let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
+ let mut writer = ParquetWriter::new(
+ self.region_dir.clone(),
+ request.file_id,
+ request.metadata,
+ self.object_store.clone(),
+ );
writer.write_all(request.source, write_opts).await?
};
diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs
index 9775f3d79160..6d70afbe7b14 100644
--- a/src/mito2/src/cache/write_cache.rs
+++ b/src/mito2/src/cache/write_cache.rs
@@ -79,8 +79,12 @@ impl WriteCache {
) -> Result > {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
- let mut writer =
- ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
+ let mut writer = ParquetWriter::new(
+ request.region_dir,
+ request.file_id,
+ request.metadata,
+ request.remote_store,
+ );
writer.write_all(request.source, write_opts).await
}
}
@@ -91,8 +95,7 @@ pub struct SstUploadRequest {
pub metadata: RegionMetadataRef,
pub source: Source,
pub storage: Option,
- /// Path to upload the file.
- pub upload_path: String,
+ pub region_dir: String,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index 0ddcec61d0f2..7ab0f55091f9 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
-use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
@@ -53,9 +52,10 @@ pub struct CompactionRequest {
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
- /// Buffering threshold while writing SST files.
- pub(crate) sst_write_buffer_size: ReadableSize,
+
pub(crate) cache_manager: CacheManagerRef,
+
+ pub(crate) engine_config: Arc,
}
impl CompactionRequest {
@@ -337,8 +337,8 @@ impl CompactionStatus {
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
- sst_write_buffer_size: engine_config.sst_write_buffer_size,
cache_manager,
+ engine_config,
};
if let Some(pending) = self.pending_compaction.take() {
diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs
index fefae906ba69..1373049882de 100644
--- a/src/mito2/src/compaction/test_util.rs
+++ b/src/mito2/src/compaction/test_util.rs
@@ -35,6 +35,7 @@ pub fn new_file_handle(
),
level,
file_size: 0,
+ inverted_index_available: false,
},
file_purger,
)
diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs
index e97030ac383c..780d965ffbba 100644
--- a/src/mito2/src/compaction/twcs.rs
+++ b/src/mito2/src/compaction/twcs.rs
@@ -17,7 +17,6 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
-use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
@@ -31,7 +30,9 @@ use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
-use crate::error::{self, CompactRegionSnafu};
+use crate::config::MitoConfig;
+use crate::error;
+use crate::error::CompactRegionSnafu;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
@@ -41,7 +42,7 @@ use crate::request::{
};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file_purger::FilePurgerRef;
-use crate::sst::parquet::WriteOptions;
+use crate::sst::parquet::{InvertedIndexOptions, WriteOptions};
use crate::sst::version::LevelMeta;
const MAX_PARALLEL_COMPACTION: usize = 8;
@@ -128,8 +129,8 @@ impl Picker for TwcsPicker {
waiters,
file_purger,
start_time,
- sst_write_buffer_size,
cache_manager,
+ engine_config,
} = req;
let region_metadata = current_version.metadata.clone();
@@ -177,7 +178,6 @@ impl Picker for TwcsPicker {
sst_layer: access_layer,
outputs,
expired_ssts,
- sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
@@ -185,6 +185,7 @@ impl Picker for TwcsPicker {
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
+ engine_config,
};
Some(Box::new(task))
}
@@ -238,7 +239,6 @@ pub(crate) struct TwcsCompactionTask {
pub sst_layer: AccessLayerRef,
pub outputs: Vec,
pub expired_ssts: Vec,
- pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
@@ -250,6 +250,7 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option,
+ pub engine_config: Arc,
}
impl Debug for TwcsCompactionTask {
@@ -299,8 +300,17 @@ impl TwcsCompactionTask {
output.output_file_id
);
+ let index_config = &self.engine_config.inverted_index;
+ let inverted_index =
+ (!index_config.disable_creation_on_compact).then(|| InvertedIndexOptions {
+ creation_memory_usage_threshold: index_config
+ .creation_memory_usage_threshold
+ .map(|size| size.as_bytes() as _),
+ });
+
let write_opts = WriteOptions {
- write_buffer_size: self.sst_write_buffer_size,
+ write_buffer_size: self.engine_config.sst_write_buffer_size,
+ inverted_index,
..Default::default()
};
let metadata = self.metadata.clone();
@@ -330,6 +340,7 @@ impl TwcsCompactionTask {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
+ inverted_index_available: sst_info.inverted_index_available,
});
Ok(file_meta_opt)
});
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 0723c702ae70..80767ead78c5 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -89,6 +89,8 @@ pub struct MitoConfig {
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
+
+ pub inverted_index: InvertedIndexConfig,
}
impl Default for MitoConfig {
@@ -113,6 +115,7 @@ impl Default for MitoConfig {
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
+ inverted_index: InvertedIndexConfig::default(),
}
}
}
@@ -188,3 +191,20 @@ fn divide_num_cpus(divisor: usize) -> usize {
(cores + divisor - 1) / divisor
}
+
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
+pub struct InvertedIndexConfig {
+ pub disable_creation_on_flush: bool,
+ pub disable_creation_on_compact: bool,
+ pub creation_memory_usage_threshold: Option,
+}
+
+impl Default for InvertedIndexConfig {
+ fn default() -> Self {
+ InvertedIndexConfig {
+ disable_creation_on_flush: false,
+ disable_creation_on_compact: false,
+ creation_memory_usage_threshold: Some(ReadableSize::mb(128)),
+ }
+ }
+}
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index 2e3dc4de0d53..ed8cd99e95da 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -553,5 +553,5 @@ async fn test_region_usage() {
assert_eq!(region_stat.sst_usage, 2742);
// region total usage
- assert_eq!(region_stat.disk_usage(), 3748);
+ assert_eq!(region_stat.disk_usage(), 3780);
}
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index 10a1de0e5de2..ebcf173a5f88 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -41,7 +41,7 @@ use crate::request::{
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
-use crate::sst::parquet::WriteOptions;
+use crate::sst::parquet::{InvertedIndexOptions, WriteOptions};
use crate::worker::WorkerListener;
/// Global write buffer (memtable) manager.
@@ -294,8 +294,17 @@ impl RegionFlushTask {
.with_label_values(&["flush_memtables"])
.start_timer();
+ let inverted_index_config = &self.engine_config.inverted_index;
+ let inverted_index_options =
+ (!inverted_index_config.disable_creation_on_flush).then(|| InvertedIndexOptions {
+ creation_memory_usage_threshold: inverted_index_config
+ .creation_memory_usage_threshold
+ .map(|size| size.as_bytes() as _),
+ });
+
let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
+ inverted_index: inverted_index_options,
..Default::default()
};
if let Some(row_group_size) = self.row_group_size {
@@ -339,6 +348,7 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
+ inverted_index_available: sst_info.inverted_index_available,
};
file_metas.push(file_meta);
}
diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs
index c28f6cd6d598..805bb1c9914a 100644
--- a/src/mito2/src/manifest/tests/checkpoint.rs
+++ b/src/mito2/src/manifest/tests/checkpoint.rs
@@ -171,6 +171,7 @@ async fn checkpoint_with_different_compression_types() {
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
+ inverted_index_available: false,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs
index eb84d6d59599..7ca316e622b9 100644
--- a/src/mito2/src/metrics.rs
+++ b/src/mito2/src/metrics.rs
@@ -121,6 +121,9 @@ lazy_static! {
/// Counter of filtered rows during merge.
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
+ /// Counter of row groups read.
+ pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
+ register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.
// Cache related metrics.
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index 4a8c7028357b..f3de2c64e3ae 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -14,8 +14,10 @@
//! Scans a region according to the scan request.
+use std::sync::Arc;
+
use common_recordbatch::SendableRecordBatchStream;
-use common_telemetry::debug;
+use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
@@ -27,6 +29,8 @@ use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
+use crate::sst::index::applier::builder::SstIndexApplierBuilder;
+use crate::sst::index::applier::SstIndexApplierRef;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -194,6 +198,7 @@ impl ScanRegion {
total_ssts
);
+ let index_applier = self.build_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
@@ -204,6 +209,7 @@ impl ScanRegion {
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
+ .with_index_applier(index_applier)
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager)
@@ -224,6 +230,24 @@ impl ScanRegion {
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
.build()
}
+
+ /// Use the latest schema to build the index applier.
+ ///
+ /// To use this fixed schema to apply to different versions of SSTs, we have to make sure:
+ /// 1. Type of column cannot be changed.
+ /// 2. Column cannot be renamed.
+ fn build_index_applier(&self) -> Option {
+ SstIndexApplierBuilder::new(
+ self.access_layer.region_dir().to_string(),
+ self.access_layer.object_store().clone(),
+ self.version.metadata.as_ref(),
+ )
+ .build(&self.request.filters)
+ .inspect_err(|err| warn!(err; "Failed to build index applier"))
+ .ok()
+ .flatten()
+ .map(Arc::new)
+ }
}
/// Config for parallel scan.
diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs
index f963568cad59..5a641f89cab5 100644
--- a/src/mito2/src/read/seq_scan.rs
+++ b/src/mito2/src/read/seq_scan.rs
@@ -39,6 +39,7 @@ use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanParallism;
use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
use crate::sst::file::FileHandle;
+use crate::sst::index::applier::SstIndexApplierRef;
/// Scans a region and returns rows in a sorted sequence.
///
@@ -62,6 +63,8 @@ pub struct SeqScan {
ignore_file_not_found: bool,
/// Parallelism to scan data.
parallelism: ScanParallism,
+
+ index_appiler: Option,
}
impl SeqScan {
@@ -73,6 +76,7 @@ impl SeqScan {
mapper: Arc::new(mapper),
time_range: None,
predicate: None,
+ index_appiler: None,
memtables: Vec::new(),
files: Vec::new(),
cache_manager: None,
@@ -95,6 +99,12 @@ impl SeqScan {
self
}
+ #[must_use]
+ pub(crate) fn with_index_applier(mut self, index_applier: Option) -> Self {
+ self.index_appiler = index_applier;
+ self
+ }
+
/// Sets memtables to read.
#[must_use]
pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self {
@@ -210,6 +220,7 @@ impl SeqScan {
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
+ .index_applier(self.index_appiler.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs
index d32133d1bcc6..59a1ed290879 100644
--- a/src/mito2/src/sst/file.rs
+++ b/src/mito2/src/sst/file.rs
@@ -95,6 +95,8 @@ pub struct FileMeta {
pub level: Level,
/// Size of the file.
pub file_size: u64,
+ /// Whether inverted index is available.
+ pub inverted_index_available: bool,
}
/// Handle to a SST file.
@@ -236,6 +238,7 @@ mod tests {
time_range: FileTimeRange::default(),
level,
file_size: 0,
+ inverted_index_available: false,
}
}
diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs
index 059b1956d7e2..95d631d7d8b9 100644
--- a/src/mito2/src/sst/file_purger.rs
+++ b/src/mito2/src/sst/file_purger.rs
@@ -137,6 +137,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
+ inverted_index_available: false,
},
file_purger,
);
diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs
index 0efc3f8e6ad0..4e7279ab2f11 100644
--- a/src/mito2/src/sst/index/applier.rs
+++ b/src/mito2/src/sst/index/applier.rs
@@ -15,6 +15,7 @@
pub mod builder;
use std::collections::BTreeSet;
+use std::sync::Arc;
use futures::{AsyncRead, AsyncSeek};
use index::inverted_index::format::reader::InvertedIndexBlobReader;
@@ -52,6 +53,8 @@ pub struct SstIndexApplier {
index_applier: Box,
}
+pub type SstIndexApplierRef = Arc;
+
impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
pub fn new(
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index fe328f16b121..978e84800e71 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -44,6 +44,8 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
+ /// Inverted index options. If it's None, inverted index will not be created.
+ pub inverted_index: Option,
}
impl Default for WriteOptions {
@@ -51,10 +53,18 @@ impl Default for WriteOptions {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
+ inverted_index: Some(InvertedIndexOptions::default()),
}
}
}
+#[derive(Debug, Default)]
+pub struct InvertedIndexOptions {
+ /// The memory usage threshold for inverted index creation.
+ /// Set to non-none value to enable external sort during inverted index creation
+ pub creation_memory_usage_threshold: Option,
+}
+
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// Time range of the SST.
@@ -65,6 +75,8 @@ pub struct SstInfo {
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option>,
+ /// Whether inverted index is available.
+ pub inverted_index_available: bool,
}
#[cfg(test)]
@@ -103,7 +115,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
- let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -116,7 +127,12 @@ mod tests {
..Default::default()
};
- let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone());
+ let mut writer = ParquetWriter::new(
+ FILE_DIR.to_string(),
+ handle.file_id(),
+ metadata,
+ object_store.clone(),
+ );
let info = writer
.write_all(source, &write_opts)
.await
@@ -152,7 +168,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
- let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -165,7 +180,12 @@ mod tests {
..Default::default()
};
// Prepare data.
- let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
+ let mut writer = ParquetWriter::new(
+ FILE_DIR.to_string(),
+ handle.file_id(),
+ metadata.clone(),
+ object_store.clone(),
+ );
writer
.write_all(source, &write_opts)
.await
@@ -220,7 +240,6 @@ mod tests {
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
- let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -234,12 +253,18 @@ mod tests {
// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
- let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
+ let mut writer = ParquetWriter::new(
+ FILE_DIR.to_string(),
+ handle.file_id(),
+ metadata.clone(),
+ object_store.clone(),
+ );
let sst_info = writer
.write_all(source, &write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
+ assert!(sst_info.inverted_index_available);
let writer_metadata = sst_info.file_metadata.unwrap();
// read the sst file metadata
diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs
index 60729c664283..15592a2626ee 100644
--- a/src/mito2/src/sst/parquet/reader.rs
+++ b/src/mito2/src/sst/parquet/reader.rs
@@ -14,12 +14,12 @@
//! Parquet reader.
-use std::collections::{HashSet, VecDeque};
+use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use common_telemetry::debug;
+use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
@@ -39,9 +39,10 @@ use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu,
Result,
};
-use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
+use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
+use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -64,6 +65,8 @@ pub struct ParquetReaderBuilder {
projection: Option>,
/// Manager that caches SST data.
cache_manager: Option,
+
+ index_applier: Option,
}
impl ParquetReaderBuilder {
@@ -81,6 +84,7 @@ impl ParquetReaderBuilder {
time_range: None,
projection: None,
cache_manager: None,
+ index_applier: None,
}
}
@@ -110,6 +114,11 @@ impl ParquetReaderBuilder {
self
}
+ pub fn index_applier(mut self, index_applier: Option) -> Self {
+ self.index_applier = index_applier;
+ self
+ }
+
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -129,35 +138,8 @@ impl ParquetReaderBuilder {
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
- // Computes column ids to read.
- let column_ids: HashSet<_> = self
- .projection
- .as_ref()
- .map(|p| p.iter().cloned().collect())
- .unwrap_or_else(|| {
- region_meta
- .column_metadatas
- .iter()
- .map(|c| c.column_id)
- .collect()
- });
let read_format = ReadFormat::new(Arc::new(region_meta));
- // Prunes row groups by metadata.
- let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate {
- let stats =
- RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids);
-
- predicate
- .prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
- .into_iter()
- .enumerate()
- .filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
- .collect()
- } else {
- (0..parquet_meta.num_row_groups()).collect()
- };
-
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let projection_mask = if let Some(column_ids) = self.projection.as_ref() {
@@ -174,6 +156,13 @@ impl ParquetReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;
+ let mut metrics = Metrics::default();
+
+ // Computes row groups to read.
+ let row_groups = self
+ .row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
+ .await;
+
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
@@ -184,12 +173,7 @@ impl ParquetReaderBuilder {
cache_manager: self.cache_manager.clone(),
};
- let metrics = Metrics {
- read_row_groups: row_groups.len(),
- build_cost: start.elapsed(),
- ..Default::default()
- };
-
+ metrics.build_cost = start.elapsed();
Ok(ParquetReader {
row_groups,
read_format,
@@ -256,13 +240,67 @@ impl ParquetReaderBuilder {
Ok(metadata)
}
+
+ /// Computes row groups to read.
+ async fn row_groups_to_read(
+ &self,
+ read_format: &ReadFormat,
+ parquet_meta: &ParquetMetaData,
+ metrics: &mut Metrics,
+ ) -> BTreeSet {
+ let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect();
+ metrics.num_row_groups_unfiltered += row_group_ids.len();
+
+ // Applies index to prune row groups.
+ if let Some(index_applier) = &self.index_applier {
+ if self.file_handle.meta().inverted_index_available {
+ match index_applier.apply(self.file_handle.file_id()).await {
+ Ok(row_groups) => row_group_ids = row_groups,
+ Err(err) => warn!(err; "Failed to apply index"),
+ }
+ }
+ }
+ metrics.num_row_groups_inverted_index_filtered += row_group_ids.len();
+
+ // Prunes row groups by metadata.
+ if let Some(predicate) = &self.predicate {
+ let region_meta = read_format.metadata();
+ let column_ids = match &self.projection {
+ Some(ids) => ids.iter().cloned().collect(),
+ None => region_meta
+ .column_metadatas
+ .iter()
+ .map(|c| c.column_id)
+ .collect(),
+ };
+
+ let row_groups = parquet_meta.row_groups();
+ let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids);
+ let row_groups_to_prune = predicate
+ .prune_with_stats(&stats, region_meta.schema.arrow_schema())
+ .into_iter()
+ .enumerate()
+ .filter_map(|(id, remain)| (!remain).then_some(id));
+
+ for row_group_id in row_groups_to_prune {
+ row_group_ids.remove(&row_group_id);
+ }
+ };
+ metrics.num_row_groups_min_max_filtered += row_group_ids.len();
+
+ row_group_ids
+ }
}
/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
- /// Number of row groups to read.
- read_row_groups: usize,
+ /// Number of unfiltered row groups.
+ num_row_groups_unfiltered: usize,
+ /// Number of row groups to read after filtering by inverted index.
+ num_row_groups_inverted_index_filtered: usize,
+ /// Number of row groups to read after filtering by min-max index.
+ num_row_groups_min_max_filtered: usize,
/// Duration to build the parquet reader.
build_cost: Duration,
/// Duration to scan the reader.
@@ -337,7 +375,7 @@ impl RowGroupReaderBuilder {
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Indices of row groups to read.
- row_groups: VecDeque,
+ row_groups: BTreeSet,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
@@ -390,8 +428,8 @@ impl Drop for ParquetReader {
self.reader_builder.file_handle.region_id(),
self.reader_builder.file_handle.file_id(),
self.reader_builder.file_handle.time_range(),
- self.metrics.read_row_groups,
- self.reader_builder.parquet_meta.num_row_groups(),
+ self.metrics.num_row_groups_min_max_filtered,
+ self.metrics.num_row_groups_unfiltered,
self.metrics
);
@@ -405,6 +443,15 @@ impl Drop for ParquetReader {
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows as u64);
+ READ_ROW_GROUPS_TOTAL
+ .with_label_values(&["unfiltered"])
+ .inc_by(self.metrics.num_row_groups_unfiltered as u64);
+ READ_ROW_GROUPS_TOTAL
+ .with_label_values(&["inverted_index_filtered"])
+ .inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64);
+ READ_ROW_GROUPS_TOTAL
+ .with_label_values(&["min_max_filtered"])
+ .inc_by(self.metrics.num_row_groups_min_max_filtered as u64);
}
}
@@ -432,7 +479,7 @@ impl ParquetReader {
}
// No more items in current row group, reads next row group.
- while let Some(row_group_idx) = self.row_groups.pop_front() {
+ while let Some(row_group_idx) = self.row_groups.pop_first() {
let mut row_group_reader = self.reader_builder.build(row_group_idx).await?;
let Some(record_batch) =
row_group_reader
diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index 2ed226791dca..7ff0bb68bcd2 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -14,11 +14,13 @@
//! Parquet writer.
+use std::num::NonZeroUsize;
use std::sync::Arc;
use common_datasource::file_format::parquet::BufferedWriter;
-use common_telemetry::debug;
+use common_telemetry::{debug, warn};
use common_time::Timestamp;
+use futures::TryFutureExt;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
@@ -27,17 +29,23 @@ use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
+use store_api::storage::RegionId;
use super::helper::parse_parquet_metadata;
use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu};
use crate::read::{Batch, Source};
+use crate::sst::file::FileId;
+use crate::sst::index::creator::SstIndexCreator;
+use crate::sst::location;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
/// Parquet SST writer.
pub struct ParquetWriter {
- /// SST output file path.
- file_path: String,
+ /// Directory of the region.
+ region_dir: String,
+ /// SST file id.
+ file_id: FileId,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
object_store: ObjectStore,
@@ -46,12 +54,14 @@ pub struct ParquetWriter {
impl ParquetWriter {
/// Creates a new parquet SST writer.
pub fn new(
- file_path: String,
+ region_dir: String,
+ file_id: FileId,
metadata: RegionMetadataRef,
object_store: ObjectStore,
) -> ParquetWriter {
ParquetWriter {
- file_path,
+ region_dir,
+ file_id,
metadata,
object_store,
}
@@ -78,9 +88,10 @@ impl ParquetWriter {
let props_builder = Self::customize_column_config(props_builder, &self.metadata);
let writer_props = props_builder.build();
+ let file_path = location::sst_file_path(&self.region_dir, self.file_id);
let write_format = WriteFormat::new(self.metadata.clone());
let mut buffered_writer = BufferedWriter::try_new(
- self.file_path.clone(),
+ file_path.clone(),
self.object_store.clone(),
write_format.arrow_schema(),
Some(writer_props),
@@ -90,21 +101,30 @@ impl ParquetWriter {
.context(WriteBufferSnafu)?;
let mut stats = SourceStats::default();
- while let Some(batch) = source.next_batch().await? {
- stats.update(&batch);
- let arrow_batch = write_format.convert_batch(&batch)?;
+ let mut index = Indexer::new(
+ self.file_id,
+ self.region_dir.clone(),
+ &self.metadata,
+ self.object_store.clone(),
+ opts,
+ );
- buffered_writer
- .write(&arrow_batch)
- .await
- .context(WriteBufferSnafu)?;
+ while let Some(batch) = write_next_batch(&mut source, &write_format, &mut buffered_writer)
+ .or_else(|err| async {
+ // abort index creation if error occurs.
+ index.abort().await;
+ Err(err)
+ })
+ .await?
+ {
+ stats.update(&batch);
+ index.update(&batch).await;
}
+ let inverted_index_available = index.finish().await;
+
if stats.num_rows == 0 {
- debug!(
- "No data written, try to stop the writer: {}",
- self.file_path
- );
+ debug!("No data written, try to stop the writer: {file_path}");
buffered_writer.close().await.context(WriteBufferSnafu)?;
return Ok(None);
@@ -124,6 +144,7 @@ impl ParquetWriter {
file_size,
num_rows: stats.num_rows,
file_metadata: Some(Arc::new(parquet_metadata)),
+ inverted_index_available,
}))
}
@@ -147,6 +168,24 @@ impl ParquetWriter {
}
}
+async fn write_next_batch(
+ source: &mut Source,
+ write_format: &WriteFormat,
+ buffered_writer: &mut BufferedWriter,
+) -> Result> {
+ let Some(batch) = source.next_batch().await? else {
+ return Ok(None);
+ };
+
+ let arrow_batch = write_format.convert_batch(&batch)?;
+ buffered_writer
+ .write(&arrow_batch)
+ .await
+ .context(WriteBufferSnafu)?;
+
+ Ok(Some(batch))
+}
+
#[derive(Default)]
struct SourceStats {
/// Number of rows fetched.
@@ -175,3 +214,109 @@ impl SourceStats {
}
}
}
+
+#[derive(Default)]
+struct Indexer {
+ file_id: FileId,
+ region_id: RegionId,
+ inner: Option,
+}
+
+impl Indexer {
+ fn new(
+ file_id: FileId,
+ region_dir: String,
+ metadata: &RegionMetadataRef,
+ object_store: ObjectStore,
+ opts: &WriteOptions,
+ ) -> Self {
+ let Some(option) = &opts.inverted_index else {
+ debug!(
+ "Skip creating index due to config, region_id: {}, file_id: {}",
+ metadata.region_id, file_id,
+ );
+ return Self::default();
+ };
+
+ if metadata.primary_key.is_empty() {
+ debug!(
+ "No tag columns, skip creating index, region_id: {}, file_id: {}",
+ metadata.region_id, file_id,
+ );
+ return Self::default();
+ }
+
+ let Some(row_group_size) = NonZeroUsize::new(opts.row_group_size) else {
+ warn!(
+ "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
+ metadata.region_id, file_id,
+ );
+ return Self::default();
+ };
+
+ let creator = SstIndexCreator::new(
+ region_dir.clone(),
+ file_id,
+ metadata,
+ object_store.clone(),
+ object_store,
+ option.creation_memory_usage_threshold,
+ row_group_size,
+ );
+
+ Self {
+ file_id,
+ region_id: metadata.region_id,
+ inner: Some(creator),
+ }
+ }
+
+ async fn update(&mut self, batch: &Batch) {
+ if let Some(creator) = self.inner.as_mut() {
+ if let Err(err) = creator.update(batch).await {
+ warn!(
+ err; "Failed to update index, skip creating index, region_id: {}, file_id: {}",
+ self.region_id, self.file_id,
+ );
+
+ // Skip index creation if error occurs.
+ self.inner = None;
+ }
+ }
+ }
+
+ async fn finish(mut self) -> bool {
+ if let Some(creator) = self.inner.as_mut() {
+ match creator.finish().await {
+ Ok((row_count, byte_count)) => {
+ debug!(
+ "Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}",
+ self.region_id, self.file_id, byte_count, row_count
+ );
+ return true;
+ }
+ Err(err) => {
+ warn!(
+ err; "Failed to create index, region_id: {}, file_id: {}",
+ self.region_id, self.file_id,
+ );
+ }
+ }
+ }
+
+ false
+ }
+
+ async fn abort(&mut self) {
+ if let Some(creator) = self.inner.as_mut() {
+ if let Err(err) = creator.abort().await {
+ warn!(
+ err; "Failed to abort index, region_id: {}, file_id: {}",
+ self.region_id, self.file_id,
+ );
+ }
+
+ self.inner = None;
+ }
+ }
+}
diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs
index 3638d119faa1..677eb00a3fc5 100644
--- a/src/mito2/src/test_util/sst_util.rs
+++ b/src/mito2/src/test_util/sst_util.rs
@@ -106,6 +106,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
),
level: 0,
file_size: 0,
+ inverted_index_available: false,
},
file_purger,
)
diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs
index e480b1f146df..be0dbd049b7c 100644
--- a/src/mito2/src/test_util/version_util.rs
+++ b/src/mito2/src/test_util/version_util.rs
@@ -96,6 +96,7 @@ impl VersionControlBuilder {
),
level: 0,
file_size: 0, // We don't care file size.
+ inverted_index_available: false,
},
);
self
@@ -136,6 +137,7 @@ pub(crate) fn apply_edit(
),
level: 0,
file_size: 0, // We don't care file size.
+ inverted_index_available: false,
}
})
.collect();
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 351fa7cd0b95..057036e6d83c 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -739,6 +739,11 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
+[datanode.region_engine.mito.inverted_index]
+disable_creation_on_flush = false
+disable_creation_on_compact = false
+creation_memory_usage_threshold = "128MiB"
+
[[datanode.region_engine]]
[datanode.region_engine.file]