From 69fedbf0eed03497343ac9896ccfd59b17e4c47f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 19 Dec 2024 19:13:31 +0800 Subject: [PATCH 01/13] wip Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/index/Cargo.toml | 1 + src/index/src/bloom_filter.rs | 1 + src/index/src/bloom_filter/applier.rs | 112 ++++++++++++++++++++++++++ 4 files changed, 115 insertions(+) create mode 100644 src/index/src/bloom_filter/applier.rs diff --git a/Cargo.lock b/Cargo.lock index fa8ba34d1a3b..48c9f3c2e843 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5219,6 +5219,7 @@ dependencies = [ "futures", "greptime-proto", "mockall", + "parquet", "pin-project", "prost 0.12.6", "rand", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index f46c64a17606..898be43b7d56 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,6 +22,7 @@ fst.workspace = true futures.workspace = true greptime-proto.workspace = true mockall.workspace = true +parquet.workspace = true pin-project.workspace = true prost.workspace = true regex.workspace = true diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index e68acc698a26..e5217c49de9d 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; +pub mod applier; pub mod creator; mod error; diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs new file mode 100644 index 000000000000..098358ae2eee --- /dev/null +++ b/src/index/src/bloom_filter/applier.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashSet}; +use std::ops::Range; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use fastbloom::BloomFilter; +use parquet::arrow::arrow_reader::RowSelection; +use parquet::file::metadata::RowGroupMetaData; + +use super::error::Result; +use super::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; + +// format +#[async_trait] +pub trait BloomFilterReader { + async fn range_read(&mut self, offset: u64, size: u32) -> Result>; + + async fn read_vec(&mut self, ranges: &[Range]) -> Result>>; + + async fn metadata(&mut self) -> Result; + + async fn bloom_filter(&mut self, loc: &BloomFilterSegmentLocation) -> Result; +} +// end of format + +pub struct BloomFilterApplier { + reader: Box, + meta: BloomFilterMeta, +} + +impl BloomFilterApplier { + pub async fn new(mut reader: Box) -> Result { + let meta = reader.metadata().await?; + + Ok(Self { reader, meta }) + } + + pub async fn search( + &mut self, + probes: &[Bytes], + row_group_metas: &[RowGroupMetaData], + basement: &BTreeMap>, + ) -> Result> { + // 0. Fast path - if basement is empty return empty vec + if basement.is_empty() { + return Ok(Vec::new()); + } + + // 1. Compute prefix sum for row counts + let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1); + prefix_sum.push(0); + for meta in row_group_metas { + prefix_sum.push(prefix_sum.last().unwrap() + meta.num_rows()); + } + + // 2. Calculate bloom filter segment locations + let mut segment_locations = Vec::new(); + for (&row_group_idx, _) in basement { + if row_group_idx >= row_group_metas.len() { + continue; + } + + segment_locations.push(BloomFilterSegmentLocation { + start_row: prefix_sum[row_group_idx], + end_row: prefix_sum[row_group_idx + 1], + }); + } + + // 3. Probe each bloom filter segment + let mut matched_locations = Vec::new(); + let mut unique_segments = HashSet::new(); + + for loc in segment_locations { + // Skip if we've already checked this segment + if !unique_segments.insert((loc.start_row, loc.end_row)) { + continue; + } + + let bloom = self.reader.bloom_filter(&loc).await?; + let mut matches = true; + + // Check if all probes exist in bloom filter + for probe in probes { + if !bloom.contains(probe) { + matches = false; + break; + } + } + + if matches { + matched_locations.push(loc); + } + } + + Ok(matched_locations) + } +} From 7db0c6c581eeabd8b3638f42ed6781339645d95b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 20 Dec 2024 16:06:27 +0800 Subject: [PATCH 02/13] draft search logic Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 68 ++++++++++++--------------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 098358ae2eee..30c3244c203e 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; use std::ops::Range; -use std::sync::Arc; -use std::time::Instant; use async_trait::async_trait; use fastbloom::BloomFilter; @@ -62,51 +60,47 @@ impl BloomFilterApplier { } // 1. Compute prefix sum for row counts + let mut sum = 0usize; let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1); - prefix_sum.push(0); + prefix_sum.push(0usize); for meta in row_group_metas { - prefix_sum.push(prefix_sum.last().unwrap() + meta.num_rows()); + sum += meta.num_rows() as usize; + prefix_sum.push(sum); } // 2. Calculate bloom filter segment locations let mut segment_locations = Vec::new(); for (&row_group_idx, _) in basement { - if row_group_idx >= row_group_metas.len() { - continue; - } - - segment_locations.push(BloomFilterSegmentLocation { - start_row: prefix_sum[row_group_idx], - end_row: prefix_sum[row_group_idx + 1], - }); - } - - // 3. Probe each bloom filter segment - let mut matched_locations = Vec::new(); - let mut unique_segments = HashSet::new(); - - for loc in segment_locations { - // Skip if we've already checked this segment - if !unique_segments.insert((loc.start_row, loc.end_row)) { - continue; - } - - let bloom = self.reader.bloom_filter(&loc).await?; - let mut matches = true; - - // Check if all probes exist in bloom filter - for probe in probes { - if !bloom.contains(probe) { - matches = false; - break; + // TODO(ruihang): support further filter over row selection + + // todo: dedup & overlap + let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment; + let rows_range_end = prefix_sum[row_group_idx + 1] / self.meta.rows_per_segment; + + for i in rows_range_start..rows_range_end { + // 3. Probe each bloom filter segment + let loc = BloomFilterSegmentLocation { + offset: self.meta.bloom_filter_segments[i].offset, + size: self.meta.bloom_filter_segments[i].size, + elem_count: self.meta.bloom_filter_segments[i].elem_count, + }; + let bloom = self.reader.bloom_filter(&loc).await?; + + // Check if all probes exist in bloom filter + let mut matches = true; + for probe in probes { + if !bloom.contains(probe) { + matches = false; + break; + } } - } - if matches { - matched_locations.push(loc); + if matches { + segment_locations.push(loc); + } } } - Ok(matched_locations) + Ok(segment_locations) } } From 009bd6736bdc4880267e27e5adb7e8655b6ff316 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 24 Dec 2024 12:44:52 +0800 Subject: [PATCH 03/13] use defined BloomFilterReader Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 30c3244c203e..28a7291922d3 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -13,36 +13,21 @@ // limitations under the License. use std::collections::BTreeMap; -use std::ops::Range; -use async_trait::async_trait; -use fastbloom::BloomFilter; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::RowGroupMetaData; -use super::error::Result; -use super::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; - -// format -#[async_trait] -pub trait BloomFilterReader { - async fn range_read(&mut self, offset: u64, size: u32) -> Result>; - - async fn read_vec(&mut self, ranges: &[Range]) -> Result>>; - - async fn metadata(&mut self) -> Result; - - async fn bloom_filter(&mut self, loc: &BloomFilterSegmentLocation) -> Result; -} -// end of format +use crate::bloom_filter::error::Result; +use crate::bloom_filter::reader::BloomFilterReader; +use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; pub struct BloomFilterApplier { - reader: Box, + reader: Box, meta: BloomFilterMeta, } impl BloomFilterApplier { - pub async fn new(mut reader: Box) -> Result { + pub async fn new(mut reader: Box) -> Result { let meta = reader.metadata().await?; Ok(Self { reader, meta }) From d2cdbac36b65c082871eb789651a129b8e0f9910 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 24 Dec 2024 14:19:27 +0800 Subject: [PATCH 04/13] fix clippy Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 28a7291922d3..880c7443f5a9 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -55,7 +55,7 @@ impl BloomFilterApplier { // 2. Calculate bloom filter segment locations let mut segment_locations = Vec::new(); - for (&row_group_idx, _) in basement { + for &row_group_idx in basement.keys() { // TODO(ruihang): support further filter over row selection // todo: dedup & overlap From 235bb359585de2bb4d4a9cfa3c656d8ed7817bb1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Dec 2024 17:06:57 +0800 Subject: [PATCH 05/13] round the range end Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 880c7443f5a9..30c2590eeb10 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -60,7 +60,9 @@ impl BloomFilterApplier { // todo: dedup & overlap let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment; - let rows_range_end = prefix_sum[row_group_idx + 1] / self.meta.rows_per_segment; + let rows_range_end = (prefix_sum[row_group_idx + 1] as f64 + / self.meta.rows_per_segment as f64) + .ceil() as usize; for i in rows_range_start..rows_range_end { // 3. Probe each bloom filter segment From faf3524b219441f5696cf17deeca7a4529fc4b1e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Dec 2024 23:06:54 +0800 Subject: [PATCH 06/13] finish index applier Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter.rs | 4 +-- src/index/src/bloom_filter/applier.rs | 40 ++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index ebd8a6076220..600f6e80e84d 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -26,7 +26,7 @@ pub type BytesRef<'a> = &'a [u8]; pub const SEED: u128 = 42; /// The Meta information of the bloom filter stored in the file. -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct BloomFilterMeta { /// The number of rows per segment. pub rows_per_segment: usize, @@ -45,7 +45,7 @@ pub struct BloomFilterMeta { } /// The location of the bloom filter segment in the file. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] pub struct BloomFilterSegmentLocation { /// The offset of the bloom filter segment in the file. pub offset: u64, diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 30c2590eeb10..c37d8cdbcb56 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::RowGroupMetaData; @@ -21,6 +21,21 @@ use crate::bloom_filter::error::Result; use crate::bloom_filter::reader::BloomFilterReader; use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; +/// Enumerates types of predicates for value filtering. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Predicate { + /// Predicate for matching values in a list. + InList(InListPredicate), +} + +/// `InListPredicate` contains a list of acceptable values. A value needs to match at least +/// one of the elements (logical OR semantic) for the predicate to be satisfied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InListPredicate { + /// List of acceptable values. + pub list: HashSet, +} + pub struct BloomFilterApplier { reader: Box, meta: BloomFilterMeta, @@ -35,13 +50,13 @@ impl BloomFilterApplier { pub async fn search( &mut self, - probes: &[Bytes], + probes: &HashSet, row_group_metas: &[RowGroupMetaData], - basement: &BTreeMap>, - ) -> Result> { + basement: &mut BTreeMap>, + ) -> Result> { // 0. Fast path - if basement is empty return empty vec if basement.is_empty() { - return Ok(Vec::new()); + return Ok(HashSet::new()); } // 1. Compute prefix sum for row counts @@ -54,7 +69,8 @@ impl BloomFilterApplier { } // 2. Calculate bloom filter segment locations - let mut segment_locations = Vec::new(); + let mut row_groups_to_remove = HashSet::new(); + let mut segment_locations = HashSet::new(); for &row_group_idx in basement.keys() { // TODO(ruihang): support further filter over row selection @@ -64,6 +80,7 @@ impl BloomFilterApplier { / self.meta.rows_per_segment as f64) .ceil() as usize; + let mut is_any_range_hit = false; for i in rows_range_start..rows_range_end { // 3. Probe each bloom filter segment let loc = BloomFilterSegmentLocation { @@ -83,9 +100,18 @@ impl BloomFilterApplier { } if matches { - segment_locations.push(loc); + segment_locations.insert(loc); } + is_any_range_hit |= matches; } + if !is_any_range_hit { + row_groups_to_remove.insert(row_group_idx); + } + } + + // 4. Remove row groups that do not match any bloom filter segment + for row_group_idx in row_groups_to_remove { + basement.remove(&row_group_idx); } Ok(segment_locations) From a7111bace778a4cadafa3604cf4a52394c25a659 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Dec 2024 23:14:37 +0800 Subject: [PATCH 07/13] integrate applier into mito2 with cache layer Signed-off-by: Ruihang Xia --- src/mito2/src/cache.rs | 14 + src/mito2/src/cache/index.rs | 1 + .../src/cache/index/bloom_filter_index.rs | 118 ++++ src/mito2/src/engine.rs | 1 + src/mito2/src/error.rs | 17 + src/mito2/src/read/scan_region.rs | 70 +++ src/mito2/src/sst/file.rs | 17 + src/mito2/src/sst/index.rs | 2 + src/mito2/src/sst/index/bloom_filter.rs | 17 + .../src/sst/index/bloom_filter/applier.rs | 561 ++++++++++++++++++ src/mito2/src/sst/index/inverted_index.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 62 ++ 12 files changed, 881 insertions(+), 1 deletion(-) create mode 100644 src/mito2/src/cache/index/bloom_filter_index.rs create mode 100644 src/mito2/src/sst/index/bloom_filter.rs create mode 100644 src/mito2/src/sst/index/bloom_filter/applier.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index a1864c55179e..77577926a2a3 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}; use moka::notification::RemovalCause; use moka::sync::Cache; use parquet::column::page::Page; @@ -69,6 +70,8 @@ pub struct CacheManager { write_cache: Option, /// Cache for inverted index. index_cache: Option, + /// Cache for bloom filter index. + bloom_filter_index_cache: Option, /// Puffin metadata cache. puffin_metadata_cache: Option, /// Cache for time series selectors. @@ -221,6 +224,10 @@ impl CacheManager { self.index_cache.as_ref() } + pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { + self.bloom_filter_index_cache.as_ref() + } + pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { self.puffin_metadata_cache.as_ref() } @@ -364,6 +371,12 @@ impl CacheManagerBuilder { self.index_content_size, self.index_content_page_size, ); + // TODO(ruihang): check if it's ok to reuse the same param with inverted index + let bloom_filter_index_cache = BloomFilterIndexCache::new( + self.index_metadata_size, + self.index_content_size, + self.index_content_page_size, + ); let puffin_metadata_cache = PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { @@ -387,6 +400,7 @@ impl CacheManagerBuilder { page_cache, write_cache: self.write_cache, index_cache: Some(Arc::new(inverted_index_cache)), + bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)), puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, } diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index c8bd7a8f329b..137dc3d87454 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod bloom_filter_index; pub mod inverted_index; use std::future::Future; diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs new file mode 100644 index 000000000000..f970f6d8029b --- /dev/null +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use index::bloom_filter::error::Result; +use index::bloom_filter::reader::BloomFilterReader; +use index::bloom_filter::BloomFilterMeta; + +use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; +use crate::metrics::{CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; + +const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; + +/// Cache for bloom filter index. +pub type BloomFilterIndexCache = IndexCache; +pub type BloomFilterIndexCacheRef = Arc; + +impl BloomFilterIndexCache { + /// Creates a new bloom filter index cache. + pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self { + Self::new_with_weighter( + index_metadata_cap, + index_content_cap, + page_size, + INDEX_TYPE_BLOOM_FILTER_INDEX, + bloom_filter_index_metadata_weight, + bloom_filter_index_content_weight, + ) + } +} + +/// Calculates weight for bloom filter index metadata. +fn bloom_filter_index_metadata_weight(k: &FileId, _: &Arc) -> u32 { + (k.as_bytes().len() + std::mem::size_of::()) as u32 +} + +/// Calculates weight for bloom filter index content. +fn bloom_filter_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { + (k.as_bytes().len() + v.len()) as u32 +} + +/// Bloom filter index blob reader with cache. +pub struct CachedBloomFilterIndexBlobReader { + file_id: FileId, + file_size: u64, + inner: R, + cache: BloomFilterIndexCacheRef, +} + +impl CachedBloomFilterIndexBlobReader { + /// Creates a new bloom filter index blob reader with cache. + pub fn new(file_id: FileId, file_size: u64, inner: R, cache: BloomFilterIndexCacheRef) -> Self { + Self { + file_id, + file_size, + inner, + cache, + } + } +} + +#[async_trait] +impl BloomFilterReader for CachedBloomFilterIndexBlobReader { + async fn range_read(&mut self, offset: u64, size: u32) -> Result { + let inner = &mut self.inner; + self.cache + .get_or_load( + self.file_id, + self.file_size, + offset, + size, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await + .map(|b| b.into()) + } + + /// Reads bunch of ranges from the file. + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + let mut results = Vec::with_capacity(ranges.len()); + for range in ranges { + let size = (range.end - range.start) as u32; + let data = self.range_read(range.start, size).await?; + results.push(data); + } + Ok(results) + } + + /// Reads the meta information of the bloom filter. + async fn metadata(&mut self) -> Result { + if let Some(cached) = self.cache.get_metadata(self.file_id) { + CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok((*cached).clone()) + } else { + let meta = self.inner.metadata().await?; + self.cache + .put_metadata(self.file_id, Arc::new(meta.clone())); + CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(meta) + } + } +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 9b912318e16b..98160eadc46a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -433,6 +433,7 @@ impl EngineInner { .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) + // .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237 .with_start_time(query_start); Ok(scan_region) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1baffd4a7fa1..5bbe302af0b7 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -562,6 +562,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build bloom filter index applier"))] + BuildBloomFilterIndexApplier { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to convert value"))] ConvertValue { source: datatypes::error::Error, @@ -576,6 +583,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to apply bloom filter index"))] + ApplyBloomFilterIndex { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to push index value"))] PushIndexValue { source: index::inverted_index::error::Error, @@ -1008,6 +1022,9 @@ impl ErrorExt for Error { EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), + BuildBloomFilterIndexApplier { source, .. } | ApplyBloomFilterIndex { source, .. } => { + source.status_code() + } BuildIndexApplier { source, .. } | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2dfa22f9f1c9..2ce3367b409b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -47,6 +47,9 @@ use crate::read::{Batch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +use crate::sst::index::bloom_filter::applier::{ + BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef, +}; use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -175,6 +178,8 @@ pub(crate) struct ScanRegion { ignore_inverted_index: bool, /// Whether to ignore fulltext index. ignore_fulltext_index: bool, + /// Whether to ignore bloom filter. + ignore_bloom_filter: bool, /// Start time of the scan task. start_time: Option, } @@ -195,6 +200,7 @@ impl ScanRegion { parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, ignore_inverted_index: false, ignore_fulltext_index: false, + ignore_bloom_filter: false, start_time: None, } } @@ -223,6 +229,14 @@ impl ScanRegion { self } + /// Sets whether to ignore bloom filter. + #[must_use] + #[allow(dead_code)] // TODO(ruihang): waiting for #5237 + pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self { + self.ignore_bloom_filter = ignore; + self + } + #[must_use] pub(crate) fn with_start_time(mut self, now: Instant) -> Self { self.start_time = Some(now); @@ -322,6 +336,7 @@ impl ScanRegion { self.maybe_remove_field_filters(); let inverted_index_applier = self.build_invereted_index_applier(); + let bloom_filter_applier = self.build_bloom_filter_applier(); let fulltext_index_applier = self.build_fulltext_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. @@ -345,6 +360,7 @@ impl ScanRegion { .with_files(files) .with_cache(self.cache_manager) .with_inverted_index_applier(inverted_index_applier) + .with_bloom_filter_index_applier(bloom_filter_applier) .with_fulltext_index_applier(fulltext_index_applier) .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_start_time(self.start_time) @@ -448,6 +464,47 @@ impl ScanRegion { .map(Arc::new) } + /// Use the latest schema to build the bloom filter index applier. + fn build_bloom_filter_applier(&self) -> Option { + if self.ignore_bloom_filter { + return None; + } + + let file_cache = || -> Option { + let cache_manager = self.cache_manager.as_ref()?; + let write_cache = cache_manager.write_cache()?; + let file_cache = write_cache.file_cache(); + Some(file_cache) + }(); + + let index_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.bloom_filter_index_cache()) + .cloned(); + + let puffin_metadata_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.puffin_metadata_cache()) + .cloned(); + + BloomFilterIndexApplierBuilder::new( + self.access_layer.region_dir().to_string(), + self.access_layer.object_store().clone(), + self.version.metadata.as_ref(), + self.access_layer.puffin_manager_factory().clone(), + ) + .with_file_cache(file_cache) + .with_bloom_filter_index_cache(index_cache) + .with_puffin_metadata_cache(puffin_metadata_cache) + .build(&self.request.filters) + .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier")) + .ok() + .flatten() + .map(Arc::new) + } + /// Use the latest schema to build the fulltext index applier. fn build_fulltext_index_applier(&self) -> Option { if self.ignore_fulltext_index { @@ -501,6 +558,7 @@ pub(crate) struct ScanInput { pub(crate) parallel_scan_channel_size: usize, /// Index appliers. inverted_index_applier: Option, + bloom_filter_index_applier: Option, fulltext_index_applier: Option, /// Start time of the query. pub(crate) query_start: Option, @@ -529,6 +587,7 @@ impl ScanInput { ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, + bloom_filter_index_applier: None, fulltext_index_applier: None, query_start: None, append_mode: false, @@ -600,6 +659,16 @@ impl ScanInput { self } + /// Sets bloom filter applier. + #[must_use] + pub(crate) fn with_bloom_filter_index_applier( + mut self, + applier: Option, + ) -> Self { + self.bloom_filter_index_applier = applier; + self + } + /// Sets fulltext index applier. #[must_use] pub(crate) fn with_fulltext_index_applier( @@ -694,6 +763,7 @@ impl ScanInput { .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .inverted_index_applier(self.inverted_index_applier.clone()) + .bloom_filter_index_applier(self.bloom_filter_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) .build_reader_input(reader_metrics) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 5a9932ab433b..33166d99cd86 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -143,6 +143,8 @@ pub enum IndexType { InvertedIndex, /// Full-text index. FulltextIndex, + /// Bloom Filter index + BloomFilterIndex, } impl FileMeta { @@ -156,6 +158,12 @@ impl FileMeta { self.available_indexes.contains(&IndexType::FulltextIndex) } + /// Returns true if the file has a bloom filter index. + pub fn bloom_filter_index_available(&self) -> bool { + self.available_indexes + .contains(&IndexType::BloomFilterIndex) + } + /// Returns the size of the inverted index file pub fn inverted_index_size(&self) -> Option { if self.available_indexes.len() == 1 && self.inverted_index_available() { @@ -173,6 +181,15 @@ impl FileMeta { None } } + + /// Returns the size of the bloom filter index file + pub fn bloom_filter_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.bloom_filter_index_available() { + Some(self.index_file_size) + } else { + None + } + } } /// Handle to a SST file. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1972f3d7abb6..3be87528a3f6 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod bloom_filter; pub(crate) mod fulltext_index; mod indexer; pub(crate) mod intermediate; @@ -40,6 +41,7 @@ use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; +pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; /// Output of the index creation. #[derive(Debug, Clone, Default)] diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs new file mode 100644 index 000000000000..2b7b1a629439 --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod applier; + +const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs new file mode 100644 index 000000000000..2493c61179dd --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -0,0 +1,561 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; + +use common_base::range_read::RangeReader; +use common_telemetry::warn; +use datafusion_common::ScalarValue; +use datafusion_expr::expr::InList; +use datafusion_expr::{BinaryExpr, Expr, Operator}; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate}; +use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use index::bloom_filter::BloomFilterSegmentLocation; +use object_store::ObjectStore; +use parquet::arrow::arrow_reader::RowSelection; +use parquet::file::metadata::RowGroupMetaData; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; +use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::{ColumnId, RegionId}; + +use super::INDEX_BLOB_TYPE; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; +use crate::cache::index::bloom_filter_index::{ + BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, +}; +use crate::error::{ + ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu, + PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, +}; +use crate::metrics::INDEX_APPLY_ELAPSED; +use crate::row_converter::SortField; +use crate::sst::file::FileId; +use crate::sst::index::inverted_index::codec::IndexValueCodec; +use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; +use crate::sst::location; + +pub(crate) type BloomFilterIndexApplierRef = Arc; + +pub struct BloomFilterIndexApplier { + region_dir: String, + region_id: RegionId, + object_store: ObjectStore, + file_cache: Option, + puffin_manager_factory: PuffinManagerFactory, + puffin_metadata_cache: Option, + bloom_filter_index_cache: Option, + // filter_applier: Box, + filters: HashMap>, +} + +impl BloomFilterIndexApplier { + pub fn new( + region_dir: String, + region_id: RegionId, + object_store: ObjectStore, + // filter_applier: Box, + puffin_manager_factory: PuffinManagerFactory, + filters: HashMap>, + ) -> Self { + Self { + region_dir, + region_id, + object_store, + file_cache: None, + puffin_manager_factory, + puffin_metadata_cache: None, + bloom_filter_index_cache: None, + // filter_applier, + filters, + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_bloom_filter_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } + + /// Applies bloom filter predicates to the provided SST file and returns a bitmap + /// indicating which segments may contain matching rows + pub async fn apply( + &self, + file_id: FileId, + file_size_hint: Option, + row_group_metas: &[RowGroupMetaData], + basement: &mut BTreeMap>, + ) -> Result<()> { + let _timer = INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) + .start_timer(); + + for (column_id, predicates) in &self.filters { + let mut blob = match self.cached_blob_reader(file_id, *column_id).await { + Ok(Some(puffin_reader)) => puffin_reader, + other => { + if let Err(err) = other { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") + } + self.remote_blob_reader(file_id, file_size_hint).await? + } + }; + + // Create appropriate reader based on whether we have caching enabled + if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { + let file_size = if let Some(file_size) = file_size_hint { + file_size + } else { + blob.metadata().await.context(MetadataSnafu)?.content_length + }; + let reader = CachedBloomFilterIndexBlobReader::new( + file_id, + file_size, + BloomFilterReaderImpl::new(blob), + bloom_filter_cache.clone(), + ); + self.apply_filters(reader, predicates, row_group_metas, basement) + .await + .context(ApplyBloomFilterIndexSnafu)?; + } else { + let reader = BloomFilterReaderImpl::new(blob); + self.apply_filters(reader, predicates, row_group_metas, basement) + .await + .context(ApplyBloomFilterIndexSnafu)?; + } + } + + Ok(()) + } + + /// Creates a blob reader from the cached index file + async fn cached_blob_reader( + &self, + file_id: FileId, + column_id: ColumnId, + ) -> Result> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + if file_cache.get(index_key).await.is_none() { + return Ok(None); + }; + + let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store()); + let puffin_file_name = file_cache.cache_file_path(index_key); + + let reader = puffin_manager + .reader(&puffin_file_name) + .await + .context(PuffinBuildReaderSnafu)? + .blob(&Self::column_blob_name(column_id)) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu)?; + Ok(Some(reader)) + } + + // TODO(ruihang): use the same util with the code in creator + fn column_blob_name(column_id: ColumnId) -> String { + format!("{INDEX_BLOB_TYPE}-{column_id}") + } + + /// Creates a blob reader from the remote index file + async fn remote_blob_reader( + &self, + file_id: FileId, + file_size_hint: Option, + ) -> Result { + let puffin_manager = self + .puffin_manager_factory + .build(self.object_store.clone()) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + + let file_path = location::index_file_path(&self.region_dir, file_id); + puffin_manager + .reader(&file_path) + .await + .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) + .blob(INDEX_BLOB_TYPE) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu) + } + + async fn apply_filters( + &self, + reader: R, + predicates: &[Predicate], + row_group_metas: &[RowGroupMetaData], + basement: &mut BTreeMap>, + ) -> std::result::Result<(), index::bloom_filter::error::Error> { + let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; + + let mut result: Option> = None; + + for predicate in predicates { + match predicate { + Predicate::InList(in_list) => { + let search_result = applier + .search(&in_list.list, row_group_metas, basement) + .await?; + + result = match result { + Some(result) => { + Some(result.intersection(&search_result).cloned().collect()) + } + None => Some(search_result), + }; + } + } + } + + Ok(()) + } +} + +pub struct BloomFilterIndexApplierBuilder<'a> { + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + file_cache: Option, + puffin_metadata_cache: Option, + bloom_filter_index_cache: Option, + output: HashMap>, +} + +impl<'a> BloomFilterIndexApplierBuilder<'a> { + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + puffin_manager_factory, + file_cache: None, + puffin_metadata_cache: None, + bloom_filter_index_cache: None, + output: HashMap::default(), + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_bloom_filter_index_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } + + /// Builds the applier with given filter expressions + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr); + } + + if self.output.is_empty() { + return Ok(None); + } + + // let predicates = self + // .output + // .into_iter() + // .map(|(column_id, predicates)| (column_id.to_string(), predicates)) + // .collect(); + + // let filter_applier = + // BloomFilterApplier::try_from(predicates).context(BuildBloomFilterIndexApplierSnafu)?; + + let applier = BloomFilterIndexApplier::new( + self.region_dir, + self.metadata.region_id, + self.object_store, + // Box::new(filter_applier), + self.puffin_manager_factory, + self.output, + ) + .with_file_cache(self.file_cache) + .with_puffin_metadata_cache(self.puffin_metadata_cache) + .with_bloom_filter_cache(self.bloom_filter_index_cache); + + Ok(Some(applier)) + } + + /// Recursively traverses expressions to collect bloom filter predicates + fn traverse_and_collect(&mut self, expr: &Expr) { + let res = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::And => { + self.traverse_and_collect(left); + self.traverse_and_collect(right); + Ok(()) + } + Operator::Eq => self.collect_eq(left, right), + _ => Ok(()), + }, + Expr::InList(in_list) => self.collect_in_list(in_list), + _ => Ok(()), + }; + + if let Err(err) = res { + warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}"); + } + } + + /// Helper function to get the column id and type + fn column_id_and_type( + &self, + column_name: &str, + ) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok(Some(( + column.column_id, + column.column_schema.data_type.clone(), + ))) + } + + /// Collects an equality expression (column = value) + fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> { + let (col, lit) = match (left, right) { + (Expr::Column(col), Expr::Literal(lit)) => (col, lit), + (Expr::Literal(lit), Expr::Column(col)) => (col, lit), + _ => return Ok(()), + }; + let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { + return Ok(()); + }; + let value = encode_lit(lit, data_type)?; + + // Create bloom filter predicate + let mut set = HashSet::new(); + set.insert(value); + let predicate = Predicate::InList(InListPredicate { list: set }); + + // Add to output predicates + self.output.entry(column_id).or_default().push(predicate); + + Ok(()) + } + + /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. + fn collect_in_list(&mut self, in_list: &InList) -> Result<()> { + // Only collect InList predicates if they reference a column + let Expr::Column(column) = &in_list.expr.as_ref() else { + return Ok(()); + }; + if in_list.list.is_empty() || in_list.negated { + return Ok(()); + } + + let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else { + return Ok(()); + }; + + // Convert all non-null literals to predicates + let predicates = in_list + .list + .iter() + .filter_map(Self::nonnull_lit) + .map(|lit| encode_lit(lit, data_type.clone())); + + // Collect successful conversions + let mut valid_predicates = HashSet::new(); + for predicate in predicates { + match predicate { + Ok(p) => { + valid_predicates.insert(p); + } + Err(e) => warn!(e; "Failed to convert value in InList"), + } + } + + if !valid_predicates.is_empty() { + self.output + .entry(column_id) + .or_default() + .push(Predicate::InList(InListPredicate { + list: valid_predicates, + })); + } + + Ok(()) + } + + /// Helper function to get non-null literal value + fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { + match expr { + Expr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } +} + +// TODO(ruihang): extract this and the one under inverted_index into a common util mod. +/// Helper function to encode a literal into bytes. +fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + + use super::*; + + fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column1", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column2", + ConcreteDataType::int64_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column3", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + fn column(name: &str) -> Expr { + Expr::Column(Column { + relation: None, + name: name.to_string(), + }) + } + + fn string_lit(s: impl Into) -> Expr { + Expr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + #[test] + fn test_build_with_exprs() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 1); + + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 1); + + let expected = encode_lit( + &ScalarValue::Utf8(Some("value1".to_string())), + ConcreteDataType::string_datatype(), + ) + .unwrap(); + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.iter().next().unwrap(), &expected); + } + } + } +} diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs index d325f735a431..68662dec52e9 100644 --- a/src/mito2/src/sst/index/inverted_index.rs +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -13,7 +13,7 @@ // limitations under the License. pub(crate) mod applier; -mod codec; +pub(crate) mod codec; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5527752a8885..6a72b43976f0 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -51,6 +51,7 @@ use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; +use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; @@ -80,6 +81,7 @@ pub struct ParquetReaderBuilder { cache_manager: Option, /// Index appliers. inverted_index_applier: Option, + bloom_filter_index_applier: Option, fulltext_index_applier: Option, /// Expected metadata of the region while reading the SST. /// This is usually the latest metadata of the region. The reader use @@ -102,6 +104,7 @@ impl ParquetReaderBuilder { projection: None, cache_manager: None, inverted_index_applier: None, + bloom_filter_index_applier: None, fulltext_index_applier: None, expected_metadata: None, } @@ -140,6 +143,16 @@ impl ParquetReaderBuilder { self } + /// Attaches the bloom filter index applier to the builder. + #[must_use] + pub(crate) fn bloom_filter_index_applier( + mut self, + index_applier: Option, + ) -> Self { + self.bloom_filter_index_applier = index_applier; + self + } + /// Attaches the fulltext index applier to the builder. #[must_use] pub(crate) fn fulltext_index_applier( @@ -359,6 +372,9 @@ impl ParquetReaderBuilder { self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); } + self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics) + .await; + output } @@ -607,6 +623,52 @@ impl ParquetReaderBuilder { *output = res; } + async fn prune_row_groups_by_bloom_filter( + &self, + parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, + _metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.bloom_filter_index_applier else { + return false; + }; + + if !self.file_handle.meta_ref().bloom_filter_index_available() { + return false; + } + + match index_applier + .apply( + self.file_handle.file_id(), + None, + parquet_meta.row_groups(), + output, + ) + .await + { + Ok(output) => output, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return false; + } + }; + + true + } + /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to /// a list of row ranges to keep. fn prune_row_groups_by_ranges( From ebfc3d79bfeecbd6acb3b2f88a222e7f85e4d900 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Dec 2024 23:58:07 +0800 Subject: [PATCH 08/13] fix cache key and add unit test Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 10 +- src/index/src/bloom_filter/reader.rs | 14 ++- .../src/cache/index/bloom_filter_index.rs | 91 ++++++++++++++----- .../src/sst/index/bloom_filter/applier.rs | 19 ++-- src/mito2/src/test_util.rs | 1 - 5 files changed, 88 insertions(+), 47 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index c37d8cdbcb56..d4f7b293bc32 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -53,10 +53,10 @@ impl BloomFilterApplier { probes: &HashSet, row_group_metas: &[RowGroupMetaData], basement: &mut BTreeMap>, - ) -> Result> { + ) -> Result<()> { // 0. Fast path - if basement is empty return empty vec if basement.is_empty() { - return Ok(HashSet::new()); + return Ok(()); } // 1. Compute prefix sum for row counts @@ -70,7 +70,6 @@ impl BloomFilterApplier { // 2. Calculate bloom filter segment locations let mut row_groups_to_remove = HashSet::new(); - let mut segment_locations = HashSet::new(); for &row_group_idx in basement.keys() { // TODO(ruihang): support further filter over row selection @@ -99,9 +98,6 @@ impl BloomFilterApplier { } } - if matches { - segment_locations.insert(loc); - } is_any_range_hit |= matches; } if !is_any_range_hit { @@ -114,6 +110,6 @@ impl BloomFilterApplier { basement.remove(&row_group_idx); } - Ok(segment_locations) + Ok(()) } } diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 6dc592100fcf..d8a6b5a74d6f 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -38,7 +38,15 @@ pub trait BloomFilterReader { async fn range_read(&mut self, offset: u64, size: u32) -> Result; /// Reads bunch of ranges from the file. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + let mut results = Vec::with_capacity(ranges.len()); + for range in ranges { + let size = (range.end - range.start) as u32; + let data = self.range_read(range.start, size).await?; + results.push(data); + } + Ok(results) + } /// Reads the meta information of the bloom filter. async fn metadata(&mut self) -> Result; @@ -79,10 +87,6 @@ impl BloomFilterReader for BloomFilterReaderImpl { .context(IoSnafu) } - async fn read_vec(&mut self, ranges: &[Range]) -> Result> { - self.reader.read_vec(ranges).await.context(IoSnafu) - } - async fn metadata(&mut self) -> Result { let metadata = self.reader.metadata().await.context(IoSnafu)?; let file_size = metadata.content_length; diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index f970f6d8029b..f6d19687ceb9 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; @@ -20,6 +19,7 @@ use bytes::Bytes; use index::bloom_filter::error::Result; use index::bloom_filter::reader::BloomFilterReader; use index::bloom_filter::BloomFilterMeta; +use store_api::storage::ColumnId; use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; @@ -28,7 +28,7 @@ use crate::sst::file::FileId; const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; /// Cache for bloom filter index. -pub type BloomFilterIndexCache = IndexCache; +pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId), BloomFilterMeta>; pub type BloomFilterIndexCacheRef = Arc; impl BloomFilterIndexCache { @@ -46,18 +46,21 @@ impl BloomFilterIndexCache { } /// Calculates weight for bloom filter index metadata. -fn bloom_filter_index_metadata_weight(k: &FileId, _: &Arc) -> u32 { - (k.as_bytes().len() + std::mem::size_of::()) as u32 +fn bloom_filter_index_metadata_weight(k: &(FileId, ColumnId), _: &Arc) -> u32 { + (k.0.as_bytes().len() + + std::mem::size_of::() + + std::mem::size_of::()) as u32 } /// Calculates weight for bloom filter index content. -fn bloom_filter_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { - (k.as_bytes().len() + v.len()) as u32 +fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: &Bytes) -> u32 { + (k.0.as_bytes().len() + std::mem::size_of::() + v.len()) as u32 } /// Bloom filter index blob reader with cache. pub struct CachedBloomFilterIndexBlobReader { file_id: FileId, + column_id: ColumnId, file_size: u64, inner: R, cache: BloomFilterIndexCacheRef, @@ -65,9 +68,16 @@ pub struct CachedBloomFilterIndexBlobReader { impl CachedBloomFilterIndexBlobReader { /// Creates a new bloom filter index blob reader with cache. - pub fn new(file_id: FileId, file_size: u64, inner: R, cache: BloomFilterIndexCacheRef) -> Self { + pub fn new( + file_id: FileId, + column_id: ColumnId, + file_size: u64, + inner: R, + cache: BloomFilterIndexCacheRef, + ) -> Self { Self { file_id, + column_id, file_size, inner, cache, @@ -81,7 +91,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let inner = &mut self.inner; self.cache .get_or_load( - self.file_id, + (self.file_id, self.column_id), self.file_size, offset, size, @@ -91,28 +101,67 @@ impl BloomFilterReader for CachedBloomFilterIndexBl .map(|b| b.into()) } - /// Reads bunch of ranges from the file. - async fn read_vec(&mut self, ranges: &[Range]) -> Result> { - let mut results = Vec::with_capacity(ranges.len()); - for range in ranges { - let size = (range.end - range.start) as u32; - let data = self.range_read(range.start, size).await?; - results.push(data); - } - Ok(results) - } - /// Reads the meta information of the bloom filter. async fn metadata(&mut self) -> Result { - if let Some(cached) = self.cache.get_metadata(self.file_id) { + if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok((*cached).clone()) } else { let meta = self.inner.metadata().await?; self.cache - .put_metadata(self.file_id, Arc::new(meta.clone())); + .put_metadata((self.file_id, self.column_id), Arc::new(meta.clone())); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) } } } + +#[cfg(test)] +mod test { + use rand::{Rng, RngCore}; + + use super::*; + + const FUZZ_REPEAT_TIMES: usize = 100; + + #[test] + fn fuzz_index_calculation() { + let mut rng = rand::thread_rng(); + let mut data = vec![0u8; 1024 * 1024]; + rng.fill_bytes(&mut data); + + for _ in 0..FUZZ_REPEAT_TIMES { + let offset = rng.gen_range(0..data.len() as u64); + let size = rng.gen_range(0..data.len() as u32 - offset as u32); + let page_size: usize = rng.gen_range(1..1024); + + let indexes = + PageKey::generate_page_keys(offset, size, page_size as u64).collect::>(); + let page_num = indexes.len(); + let mut read = Vec::with_capacity(size as usize); + for key in indexes.into_iter() { + let start = key.page_id as usize * page_size; + let page = if start + page_size < data.len() { + &data[start..start + page_size] + } else { + &data[start..] + }; + read.extend_from_slice(page); + } + let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec(); + assert_eq!( + read, + data.get(expected_range).unwrap(), + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", + offset, + size, + page_size, + read.len(), + size as usize, + PageKey::calculate_range(offset, size, page_size as u64), + page_num + ); + } + } +} diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index aff25e412bb1..8e1b3b359e10 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -24,7 +24,6 @@ use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate}; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; -use index::bloom_filter::BloomFilterSegmentLocation; use object_store::ObjectStore; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::RowGroupMetaData; @@ -128,7 +127,8 @@ impl BloomFilterIndexApplier { if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } - self.remote_blob_reader(file_id, file_size_hint).await? + self.remote_blob_reader(file_id, *column_id, file_size_hint) + .await? } }; @@ -141,6 +141,7 @@ impl BloomFilterIndexApplier { }; let reader = CachedBloomFilterIndexBlobReader::new( file_id, + *column_id, file_size, BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), @@ -199,6 +200,7 @@ impl BloomFilterIndexApplier { async fn remote_blob_reader( &self, file_id: FileId, + column_id: ColumnId, file_size_hint: Option, ) -> Result { let puffin_manager = self @@ -212,7 +214,7 @@ impl BloomFilterIndexApplier { .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) - .blob(INDEX_BLOB_TYPE) + .blob(&Self::column_blob_name(column_id)) .await .context(PuffinReadBlobSnafu)? .reader() @@ -229,21 +231,12 @@ impl BloomFilterIndexApplier { ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; - let mut result: Option> = None; - for predicate in predicates { match predicate { Predicate::InList(in_list) => { - let search_result = applier + applier .search(&in_list.list, row_group_metas, basement) .await?; - - result = match result { - Some(result) => { - Some(result.intersection(&search_result).cloned().collect()) - } - None => Some(search_result), - }; } } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 14b4bb4a9109..9eb77d706df3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -643,7 +643,6 @@ impl TestEnv { .await .unwrap(); - let object_store_manager = self.get_object_store_manager().unwrap(); let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr) .await .unwrap(); From 90089908219cb6e60ece1df80bd2332b3728985c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 00:01:06 +0800 Subject: [PATCH 09/13] provide bloom filter index size hint Signed-off-by: Ruihang Xia --- src/mito2/src/sst/parquet/reader.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6a72b43976f0..5931658879ad 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -637,10 +637,11 @@ impl ParquetReaderBuilder { return false; } + let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); match index_applier .apply( self.file_handle.file_id(), - None, + file_size_hint, parquet_meta.row_groups(), output, ) From 29e33670b6db63c0849a210019a22a637d458db9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 00:20:35 +0800 Subject: [PATCH 10/13] revert BloomFilterReaderImpl::read_vec Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/reader.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index d8a6b5a74d6f..02085fa671f7 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -87,6 +87,10 @@ impl BloomFilterReader for BloomFilterReaderImpl { .context(IoSnafu) } + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + self.reader.read_vec(ranges).await.context(IoSnafu) + } + async fn metadata(&mut self) -> Result { let metadata = self.reader.metadata().await.context(IoSnafu)?; let file_size = metadata.content_length; From 2d174910f6beb63789063a16b0760cbec1d44c06 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 00:23:17 +0800 Subject: [PATCH 11/13] remove dead code Signed-off-by: Ruihang Xia --- src/mito2/src/sst/index/bloom_filter/applier.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 8e1b3b359e10..b0dfc168d153 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -60,7 +60,6 @@ pub struct BloomFilterIndexApplier { puffin_manager_factory: PuffinManagerFactory, puffin_metadata_cache: Option, bloom_filter_index_cache: Option, - // filter_applier: Box, filters: HashMap>, } @@ -69,7 +68,6 @@ impl BloomFilterIndexApplier { region_dir: String, region_id: RegionId, object_store: ObjectStore, - // filter_applier: Box, puffin_manager_factory: PuffinManagerFactory, filters: HashMap>, ) -> Self { @@ -81,7 +79,6 @@ impl BloomFilterIndexApplier { puffin_manager_factory, puffin_metadata_cache: None, bloom_filter_index_cache: None, - // filter_applier, filters, } } @@ -306,20 +303,10 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { return Ok(None); } - // let predicates = self - // .output - // .into_iter() - // .map(|(column_id, predicates)| (column_id.to_string(), predicates)) - // .collect(); - - // let filter_applier = - // BloomFilterApplier::try_from(predicates).context(BuildBloomFilterIndexApplierSnafu)?; - let applier = BloomFilterIndexApplier::new( self.region_dir, self.metadata.region_id, self.object_store, - // Box::new(filter_applier), self.puffin_manager_factory, self.output, ) From 2fcbbe0a6e796010a7c635989c28938ce89cce91 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 00:32:04 +0800 Subject: [PATCH 12/13] ignore null on eq Signed-off-by: Ruihang Xia --- src/mito2/src/sst/index/bloom_filter/applier.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index b0dfc168d153..8fbbbb1eb350 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -363,6 +363,9 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { (Expr::Literal(lit), Expr::Column(col)) => (col, lit), _ => return Ok(()), }; + if lit.is_null() { + return Ok(()); + } let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { return Ok(()); }; From 7a2be74a980fdb1fb2f740e124890ca2c356a00f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 00:53:57 +0800 Subject: [PATCH 13/13] add more tests and fix bloom filter logic Signed-off-by: Ruihang Xia --- src/index/src/bloom_filter/applier.rs | 26 ++- src/mito2/src/error.rs | 11 +- .../src/sst/index/bloom_filter/applier.rs | 178 ++++++++++++++++++ 3 files changed, 201 insertions(+), 14 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index d4f7b293bc32..2750cbb92b6b 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -48,6 +48,21 @@ impl BloomFilterApplier { Ok(Self { reader, meta }) } + /// Searches for matching row groups using bloom filters. + /// + /// This method applies bloom filter index to eliminate row groups that definitely + /// don't contain the searched values. It works by: + /// + /// 1. Computing prefix sums for row counts + /// 2. Calculating bloom filter segment locations for each row group + /// 1. A row group may span multiple bloom filter segments + /// 3. Probing bloom filter segments + /// 4. Removing non-matching row groups from the basement + /// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed + /// + /// # Note + /// The method modifies the `basement` map in-place by removing row groups that + /// don't match the bloom filter criteria. pub async fn search( &mut self, probes: &HashSet, @@ -89,16 +104,19 @@ impl BloomFilterApplier { }; let bloom = self.reader.bloom_filter(&loc).await?; - // Check if all probes exist in bloom filter - let mut matches = true; + // Check if any probe exists in bloom filter + let mut matches = false; for probe in probes { - if !bloom.contains(probe) { - matches = false; + if bloom.contains(probe) { + matches = true; break; } } is_any_range_hit |= matches; + if matches { + break; + } } if !is_any_range_hit { row_groups_to_remove.insert(row_group_idx); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1557b764e053..0820d99337ec 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -562,13 +562,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to build bloom filter index applier"))] - BuildBloomFilterIndexApplier { - source: index::bloom_filter::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to convert value"))] ConvertValue { source: datatypes::error::Error, @@ -1036,9 +1029,7 @@ impl ErrorExt for Error { EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), - BuildBloomFilterIndexApplier { source, .. } | ApplyBloomFilterIndex { source, .. } => { - source.status_code() - } + ApplyBloomFilterIndex { source, .. } => source.status_code(), BuildIndexApplier { source, .. } | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 8fbbbb1eb350..3476ec097243 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -541,4 +541,182 @@ mod tests { } } } + + fn int64_lit(i: i64) -> Expr { + Expr::Literal(ScalarValue::Int64(Some(i))) + } + + #[test] + fn test_build_with_in_list() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2), int64_lit(3)], + negated: false, + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&2).unwrap(); + assert_eq!(column_predicates.len(), 1); + + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 3); + } + } + } + + #[test] + fn test_build_with_and_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })), + op: Operator::And, + right: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column2")), + op: Operator::Eq, + right: Box::new(int64_lit(42)), + })), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 2); + assert!(filters.contains_key(&1)); + assert!(filters.contains_key(&2)); + } + + #[test] + fn test_build_with_null_values() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(None))), + }), + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![ + int64_lit(1), + Expr::Literal(ScalarValue::Int64(None)), + int64_lit(3), + ], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert!(!filters.contains_key(&1)); // Null equality should be ignored + let column2_predicates = filters.get(&2).unwrap(); + match &column2_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 2); // Only non-null values should be included + } + } + } + + #[test] + fn test_build_with_invalid_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + // Non-equality operator + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Gt, + right: Box::new(string_lit("value1")), + }), + // Non-existent column + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("non_existent")), + op: Operator::Eq, + right: Box::new(string_lit("value")), + }), + // Negated IN list + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2)], + negated: true, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_build_with_multiple_predicates_same_column() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + }), + Expr::InList(InList { + expr: Box::new(column("column1")), + list: vec![string_lit("value2"), string_lit("value3")], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 2); + } }