Skip to content

Commit

Permalink
feat(bloom-filter): integrate indexer with mito2 (#5236)
Browse files Browse the repository at this point in the history
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <[email protected]>

* rename skippingindextype

Signed-off-by: Zhenchi <[email protected]>

* address comments

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Dec 25, 2024
1 parent 039989f commit a9f2191
Show file tree
Hide file tree
Showing 22 changed files with 1,032 additions and 254 deletions.
2 changes: 1 addition & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R
use crate::prelude::ConcreteDataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
Expand Down
12 changes: 6 additions & 6 deletions src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ pub struct SkippingIndexOptions {
pub granularity: u32,
/// The type of the skip index.
#[serde(default)]
pub index_type: SkipIndexType,
pub index_type: SkippingIndexType,
}

impl fmt::Display for SkippingIndexOptions {
Expand All @@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions {

/// Skip index types.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
pub enum SkipIndexType {
pub enum SkippingIndexType {
#[default]
BloomFilter,
}

impl fmt::Display for SkipIndexType {
impl fmt::Display for SkippingIndexType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SkipIndexType::BloomFilter => write!(f, "BLOOM"),
SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
}
}
}
Expand All @@ -587,15 +587,15 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
// Parse index type with default value BloomFilter
let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
Some(typ) => match typ.to_ascii_uppercase().as_str() {
"BLOOM" => SkipIndexType::BloomFilter,
"BLOOM" => SkippingIndexType::BloomFilter,
_ => {
return error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
}
.fail();
}
},
None => SkipIndexType::default(),
None => SkippingIndexType::default(),
};

Ok(SkippingIndexOptions {
Expand Down
6 changes: 3 additions & 3 deletions src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BloomFilterCreator {
/// `rows_per_segment` <= 0
pub fn new(
rows_per_segment: usize,
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
Expand Down Expand Up @@ -252,7 +252,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down Expand Up @@ -322,7 +322,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down
8 changes: 4 additions & 4 deletions src/index/src/bloom_filter/creator/finalize_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct FinalizedBloomFilterStorage {
intermediate_prefix: String,

/// The provider for intermediate Bloom filter files.
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,

/// The memory usage of the in-memory Bloom filters.
memory_usage: usize,
Expand All @@ -59,7 +59,7 @@ pub struct FinalizedBloomFilterStorage {
impl FinalizedBloomFilterStorage {
/// Creates a new `FinalizedBloomFilterStorage`.
pub fn new(
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
Expand Down Expand Up @@ -132,7 +132,7 @@ impl FinalizedBloomFilterStorage {
/// Drains the storage and returns a stream of finalized Bloom filter segments.
pub async fn drain(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + '_>>> {
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>> {
// FAST PATH: memory only
if self.intermediate_file_id_counter == 0 {
return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))));
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {

let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Box::new(mock_provider);
let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/bloom_filter/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
let mut writer = Cursor::new(vec![]);
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down
14 changes: 2 additions & 12 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_telemetry::{info, warn};
use common_time::TimeToLive;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -41,7 +40,7 @@ use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileMeta, IndexType};
use crate::sst::file::FileMeta;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
Expand Down Expand Up @@ -336,16 +335,7 @@ impl Compactor for DefaultCompactor {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: {
let mut indexes = SmallVec::new();
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
Expand Down
25 changes: 22 additions & 3 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to retrieve fulltext options from column metadata"))]
FulltextOptions {
#[snafu(display("Failed to retrieve index options from column metadata"))]
IndexOptions {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
Expand Down Expand Up @@ -904,6 +904,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to push value to bloom filter"))]
PushBloomFilterValue {
source: index::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to finish bloom filter"))]
BloomFilterFinish {
source: index::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -1029,7 +1043,7 @@ impl ErrorExt for Error {
UnsupportedOperation { .. } => StatusCode::Unsupported,
RemoteCompaction { .. } => StatusCode::Unexpected,

FulltextOptions { source, .. } => source.status_code(),
IndexOptions { source, .. } => source.status_code(),
CreateFulltextCreator { source, .. } => source.status_code(),
CastVector { source, .. } => source.status_code(),
FulltextPushText { source, .. }
Expand All @@ -1039,7 +1053,12 @@ impl ErrorExt for Error {
RegionBusy { .. } => StatusCode::RegionBusy,
GetSchemaMetadata { source, .. } => source.status_code(),
Timeout { .. } => StatusCode::Cancelled,

DecodeArrowRowGroup { .. } => StatusCode::Internal,

PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => {
source.status_code()
}
}
}

Expand Down
14 changes: 2 additions & 12 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::{debug, error, info, trace};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
Expand All @@ -45,7 +44,7 @@ use crate::request::{
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta, IndexType};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;

Expand Down Expand Up @@ -378,16 +377,7 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: {
let mut indexes = SmallVec::new();
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub enum IndexType {
InvertedIndex,
/// Full-text index.
FulltextIndex,
/// Bloom filter.
BloomFilter,
}

impl FileMeta {
Expand All @@ -156,6 +158,11 @@ impl FileMeta {
self.available_indexes.contains(&IndexType::FulltextIndex)
}

/// Returns true if the file has a bloom filter
pub fn bloom_filter_available(&self) -> bool {
self.available_indexes.contains(&IndexType::BloomFilter)
}

/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Expand Down
Loading

0 comments on commit a9f2191

Please sign in to comment.