Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): add bloom filter config #5237

Merged
merged 11 commits into from
Dec 26, 2024
10 changes: 10 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down Expand Up @@ -486,6 +491,11 @@
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down
24 changes: 24 additions & 0 deletions config/datanode.example.toml
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,30 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter index in Mito engine.
[region_engine.mito.bloom_filter_index]

## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for the index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
24 changes: 24 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,30 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter_index]

## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator_batch_push() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
let mut creator: BloomFilterCreator = BloomFilterCreator::new(
2,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -154,6 +154,7 @@ impl AccessLayer {
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest {
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}

pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl WriteCache {
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
bloom_filter_index_config: write_request.bloom_filter_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -378,6 +379,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -470,6 +472,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ impl Compactor for DefaultCompactor {
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -325,6 +327,7 @@ impl Compactor for DefaultCompactor {
index_options,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
)
Expand Down
45 changes: 45 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct MitoConfig {
pub inverted_index: InvertedIndexConfig,
/// Full-text index configs.
pub fulltext_index: FulltextIndexConfig,
/// Bloom filter index configs.
pub bloom_filter_index: BloomFilterConfig,

/// Memtable config
pub memtable: MemtableConfig,
Expand Down Expand Up @@ -155,6 +157,7 @@ impl Default for MitoConfig {
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
Expand Down Expand Up @@ -511,6 +514,48 @@ impl FulltextIndexConfig {
}
}

/// Configuration options for the bloom filter.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct BloomFilterConfig {
/// Whether to create the index on flush: automatically or never.
pub create_on_flush: Mode,
/// Whether to create the index on compaction: automatically or never.
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Memory threshold for creating the index.
pub mem_threshold_on_create: MemoryThreshold,
}

impl Default for BloomFilterConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
}
}
}

impl BloomFilterConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}

/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl EngineInner {
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
// .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start);

Ok(scan_region)
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl RegionFlushTask {
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let Some(sst_info) = self
.access_layer
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ impl ScanRegion {

/// Sets whether to ignore bloom filter.
#[must_use]
#[allow(dead_code)] // TODO(ruihang): waiting for #5237
pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
self.ignore_bloom_filter = ignore;
self
Expand Down
44 changes: 40 additions & 4 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, RegionId};

use crate::access_layer::OperationType;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -179,6 +179,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}

impl<'a> IndexerBuilder<'a> {
Expand Down Expand Up @@ -320,7 +321,10 @@ impl<'a> IndexerBuilder<'a> {
}

fn build_bloom_filter_indexer(&self) -> Option<BloomFilterIndexer> {
let create = true; // TODO(zhongzc): add config for bloom filter
let create = match self.op_type {
OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
};

if !create {
debug!(
Expand All @@ -330,7 +334,7 @@ impl<'a> IndexerBuilder<'a> {
return None;
}

let mem_limit = Some(16 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter
let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
let indexer = BloomFilterIndexer::new(
self.file_id,
self.metadata,
Expand Down Expand Up @@ -496,6 +500,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down Expand Up @@ -530,12 +535,14 @@ mod tests {
..Default::default()
},
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;

assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_some());

let indexer = IndexerBuilder {
op_type: OperationType::Compact,
Expand All @@ -544,19 +551,44 @@ mod tests {
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;

assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
assert!(indexer.bloom_filter_indexer.is_some());

let indexer = IndexerBuilder {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig {
create_on_compaction: Mode::Disable,
..Default::default()
},
}
.build()
.await;

assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_none());
}

#[tokio::test]
Expand All @@ -581,6 +613,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand All @@ -605,6 +638,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand All @@ -629,6 +663,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down Expand Up @@ -660,6 +695,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down
Loading
Loading