Skip to content

Commit

Permalink
feat(config): add bloom filter config (#5237)
Browse files Browse the repository at this point in the history
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <[email protected]>

* feat(config) add bloom filter config

Signed-off-by: Zhenchi <[email protected]>

* fix

Signed-off-by: Zhenchi <[email protected]>

* fix docs

Signed-off-by: Zhenchi <[email protected]>

* address comments

Signed-off-by: Zhenchi <[email protected]>

* fix docs

Signed-off-by: Zhenchi <[email protected]>

* merge

Signed-off-by: Zhenchi <[email protected]>

* remove cache config

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Dec 26, 2024
1 parent 0cf44e1 commit f4b2d39
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 8 deletions.
10 changes: 10 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down Expand Up @@ -486,6 +491,11 @@
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down
24 changes: 24 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,30 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter index in Mito engine.
[region_engine.mito.bloom_filter_index]

## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for the index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
24 changes: 24 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,30 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter_index]

## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator_batch_push() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
let mut creator: BloomFilterCreator = BloomFilterCreator::new(
2,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -154,6 +154,7 @@ impl AccessLayer {
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest {
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}

pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl WriteCache {
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
bloom_filter_index_config: write_request.bloom_filter_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -378,6 +379,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -470,6 +472,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ impl Compactor for DefaultCompactor {
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -325,6 +327,7 @@ impl Compactor for DefaultCompactor {
index_options,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
)
Expand Down
45 changes: 45 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct MitoConfig {
pub inverted_index: InvertedIndexConfig,
/// Full-text index configs.
pub fulltext_index: FulltextIndexConfig,
/// Bloom filter index configs.
pub bloom_filter_index: BloomFilterConfig,

/// Memtable config
pub memtable: MemtableConfig,
Expand Down Expand Up @@ -155,6 +157,7 @@ impl Default for MitoConfig {
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
Expand Down Expand Up @@ -511,6 +514,48 @@ impl FulltextIndexConfig {
}
}

/// Configuration options for the bloom filter.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct BloomFilterConfig {
/// Whether to create the index on flush: automatically or never.
pub create_on_flush: Mode,
/// Whether to create the index on compaction: automatically or never.
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Memory threshold for creating the index.
pub mem_threshold_on_create: MemoryThreshold,
}

impl Default for BloomFilterConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
}
}
}

impl BloomFilterConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}

/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl EngineInner {
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
// .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start);

Ok(scan_region)
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl RegionFlushTask {
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let Some(sst_info) = self
.access_layer
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ impl ScanRegion {

/// Sets whether to ignore bloom filter.
#[must_use]
#[allow(dead_code)] // TODO(ruihang): waiting for #5237
pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
self.ignore_bloom_filter = ignore;
self
Expand Down
44 changes: 40 additions & 4 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, RegionId};

use crate::access_layer::OperationType;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -179,6 +179,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}

impl<'a> IndexerBuilder<'a> {
Expand Down Expand Up @@ -320,7 +321,10 @@ impl<'a> IndexerBuilder<'a> {
}

fn build_bloom_filter_indexer(&self) -> Option<BloomFilterIndexer> {
let create = true; // TODO(zhongzc): add config for bloom filter
let create = match self.op_type {
OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
};

if !create {
debug!(
Expand All @@ -330,7 +334,7 @@ impl<'a> IndexerBuilder<'a> {
return None;
}

let mem_limit = Some(16 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter
let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
let indexer = BloomFilterIndexer::new(
self.file_id,
self.metadata,
Expand Down Expand Up @@ -496,6 +500,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down Expand Up @@ -530,12 +535,14 @@ mod tests {
..Default::default()
},
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;

assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_some());

let indexer = IndexerBuilder {
op_type: OperationType::Compact,
Expand All @@ -544,19 +551,44 @@ mod tests {
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;

assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
assert!(indexer.bloom_filter_indexer.is_some());

let indexer = IndexerBuilder {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig {
create_on_compaction: Mode::Disable,
..Default::default()
},
}
.build()
.await;

assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_none());
}

#[tokio::test]
Expand All @@ -581,6 +613,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand All @@ -605,6 +638,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand All @@ -629,6 +663,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down Expand Up @@ -660,6 +695,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.await;
Expand Down
Loading

0 comments on commit f4b2d39

Please sign in to comment.