Skip to content

Commit

Permalink
feat(mito): add options to ignore building index for specific column …
Browse files Browse the repository at this point in the history
…ids (#3295)

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Feb 16, 2024
1 parent 34050ea commit f9ce270
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 61 deletions.
4 changes: 4 additions & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
Expand Down Expand Up @@ -143,6 +144,7 @@ impl AccessLayer {
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
}
.build();
let mut writer = ParquetWriter::new(
Expand Down Expand Up @@ -187,6 +189,8 @@ pub(crate) struct SstWriteRequest {
pub(crate) mem_threshold_index_create: Option<usize>,
/// The size of write buffer for index.
pub(crate) index_write_buffer_size: Option<usize>,
/// The options of the index for the region.
pub(crate) index_options: IndexOptions,
}

/// Creates a fs object store with atomic write dir.
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl WriteCache {
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
}
.build();

Expand Down Expand Up @@ -235,6 +236,7 @@ mod tests {
use super::*;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::sst::parquet::reader::ParquetReaderBuilder;
Expand Down Expand Up @@ -279,6 +281,7 @@ mod tests {
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -363,6 +366,7 @@ mod tests {
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::region::options::IndexOptions;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Picker for TwcsPicker {
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -251,6 +253,8 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -327,6 +331,7 @@ impl TwcsCompactionTask {
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
futs.push(async move {
let reader =
build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?;
Expand All @@ -341,6 +346,7 @@ impl TwcsCompactionTask {
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::error::{
use crate::memtable::MemtableBuilderRef;
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
Expand Down Expand Up @@ -203,6 +204,9 @@ pub(crate) struct RegionFlushTask {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
pub(crate) cache_manager: CacheManagerRef,

/// Index options for the region.
pub(crate) index_options: IndexOptions,
}

impl RegionFlushTask {
Expand Down Expand Up @@ -338,6 +342,7 @@ impl RegionFlushTask {
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options: self.index_options.clone(),
};
let Some(sst_info) = self
.access_layer
Expand Down Expand Up @@ -766,6 +771,7 @@ mod tests {
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
index_options: IndexOptions::default(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ impl ScanRegion {
self.access_layer.object_store().clone(),
file_cache,
self.version.metadata.as_ref(),
self.version
.options
.index_options
.inverted_index
.ignore_column_ids
.iter()
.copied()
.collect(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))
Expand Down
64 changes: 63 additions & 1 deletion src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use std::collections::HashMap;
use std::time::Duration;

use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use serde::Deserialize;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer};
use serde_json::Value;
use serde_with::{serde_as, with_prefix, DisplayFromStr};
use snafu::ResultExt;
use store_api::storage::ColumnId;

use crate::error::{Error, JsonOptionsSnafu, Result};

Expand All @@ -40,6 +42,8 @@ pub struct RegionOptions {
pub storage: Option<String>,
/// Wal options.
pub wal_options: WalOptions,
/// Index options.
pub index_options: IndexOptions,
}

impl TryFrom<&HashMap<String, String>> for RegionOptions {
Expand All @@ -64,11 +68,14 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
},
)?;

let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;

Ok(RegionOptions {
ttl: options.ttl,
compaction,
storage: options.storage,
wal_options,
index_options,
})
}
}
Expand Down Expand Up @@ -152,6 +159,40 @@ impl Default for RegionOptionsWithoutEnum {
}
}

with_prefix!(prefix_inverted_index "index.inverted_index.");

/// Options for index.
#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)]
#[serde(default)]
pub struct IndexOptions {
/// Options for the inverted index.
#[serde(flatten, with = "prefix_inverted_index")]
pub inverted_index: InvertedIndexOptions,
}

/// Options for the inverted index.
#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)]
#[serde(default)]
pub struct InvertedIndexOptions {
/// The column ids that should be ignored when building the inverted index.
/// The column ids are separated by commas. For example, "1,2,3".
#[serde(deserialize_with = "deserialize_ignore_column_ids")]
pub ignore_column_ids: Vec<ColumnId>,
}

fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut column_ids = Vec::new();
for item in s.split(',') {
let column_id = item.parse().map_err(D::Error::custom)?;
column_ids.push(column_id);
}
Ok(column_ids)
}

/// Converts the `options` map to a json object.
///
/// Converts all key-values to lowercase and replaces "null" strings by `null` json values.
Expand Down Expand Up @@ -257,6 +298,21 @@ mod tests {
expect == got
}

#[test]
fn test_with_index() {
let map = make_map(&[("index.inverted_index.ignore_column_ids", "1,2,3")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
},
},
..Default::default()
};
assert_eq!(expect, options);
}

// No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
#[test]
fn test_with_any_wal_options() {
Expand All @@ -281,6 +337,7 @@ mod tests {
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("storage", "S3"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
(
WAL_OPTIONS_KEY,
&serde_json::to_string(&wal_options).unwrap(),
Expand All @@ -296,6 +353,11 @@ mod tests {
}),
storage: Some("s3".to_string()),
wal_options,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
},
},
};
assert_eq!(expect, options);
}
Expand Down
16 changes: 15 additions & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;

Expand Down Expand Up @@ -132,6 +133,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) segment_row_count: usize,
pub(crate) object_store: ObjectStore,
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
}

impl<'a> IndexerBuilder<'a> {
Expand Down Expand Up @@ -184,7 +186,15 @@ impl<'a> IndexerBuilder<'a> {
self.mem_threshold_index_create,
segment_row_count,
)
.with_buffer_size(self.write_buffer_size);
.with_buffer_size(self.write_buffer_size)
.with_ignore_column_ids(
self.index_options
.inverted_index
.ignore_column_ids
.iter()
.map(|i| i.to_string())
.collect(),
);

Indexer {
file_id: self.file_id,
Expand Down Expand Up @@ -281,6 +291,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -301,6 +312,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -321,6 +333,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -341,6 +354,7 @@ mod tests {
row_group_size: 0,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand Down
Loading

0 comments on commit f9ce270

Please sign in to comment.