From a9f21915efacacd7ac10d76bf2caa342776f2490 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 22:30:07 +0800 Subject: [PATCH] feat(bloom-filter): integrate indexer with mito2 (#5236) * feat(bloom-filter): integrate indexer with mito2 Signed-off-by: Zhenchi * rename skippingindextype Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/datatypes/src/schema.rs | 2 +- src/datatypes/src/schema/column_schema.rs | 12 +- src/index/src/bloom_filter/creator.rs | 6 +- .../bloom_filter/creator/finalize_segment.rs | 8 +- src/index/src/bloom_filter/reader.rs | 2 +- src/mito2/src/compaction/compactor.rs | 14 +- src/mito2/src/error.rs | 25 +- src/mito2/src/flush.rs | 14 +- src/mito2/src/sst/file.rs | 7 + src/mito2/src/sst/index.rs | 151 ++++- src/mito2/src/sst/index/bloom_filter.rs | 17 + .../src/sst/index/bloom_filter/creator.rs | 530 ++++++++++++++++++ .../sst/index/{inverted_index => }/codec.rs | 0 .../src/sst/index/fulltext_index/creator.rs | 16 +- src/mito2/src/sst/index/indexer/abort.rs | 26 +- src/mito2/src/sst/index/indexer/finish.rs | 73 ++- src/mito2/src/sst/index/indexer/update.rs | 32 +- src/mito2/src/sst/index/intermediate.rs | 157 ++++++ src/mito2/src/sst/index/inverted_index.rs | 1 - .../index/inverted_index/applier/builder.rs | 2 +- .../src/sst/index/inverted_index/creator.rs | 9 +- .../inverted_index/creator/temp_provider.rs | 182 ------ 22 files changed, 1032 insertions(+), 254 deletions(-) create mode 100644 src/mito2/src/sst/index/bloom_filter.rs create mode 100644 src/mito2/src/sst/index/bloom_filter/creator.rs rename src/mito2/src/sst/index/{inverted_index => }/codec.rs (100%) delete mode 100644 src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index c537a4608b42..19f3c6e55fb1 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -29,7 +29,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions, - COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, + SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY, diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 7a96ab5e2bf2..74e066adc7b4 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -543,7 +543,7 @@ pub struct SkippingIndexOptions { pub granularity: u32, /// The type of the skip index. #[serde(default)] - pub index_type: SkipIndexType, + pub index_type: SkippingIndexType, } impl fmt::Display for SkippingIndexOptions { @@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions { /// Skip index types. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)] -pub enum SkipIndexType { +pub enum SkippingIndexType { #[default] BloomFilter, } -impl fmt::Display for SkipIndexType { +impl fmt::Display for SkippingIndexType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - SkipIndexType::BloomFilter => write!(f, "BLOOM"), + SkippingIndexType::BloomFilter => write!(f, "BLOOM"), } } } @@ -587,7 +587,7 @@ impl TryFrom> for SkippingIndexOptions { // Parse index type with default value BloomFilter let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) { Some(typ) => match typ.to_ascii_uppercase().as_str() { - "BLOOM" => SkipIndexType::BloomFilter, + "BLOOM" => SkippingIndexType::BloomFilter, _ => { return error::InvalidSkippingIndexOptionSnafu { msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"), @@ -595,7 +595,7 @@ impl TryFrom> for SkippingIndexOptions { .fail(); } }, - None => SkipIndexType::default(), + None => SkippingIndexType::default(), }; Ok(SkippingIndexOptions { diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index f8c54239645b..da95334782a7 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -73,7 +73,7 @@ impl BloomFilterCreator { /// `rows_per_segment` <= 0 pub fn new( rows_per_segment: usize, - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -252,7 +252,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); @@ -322,7 +322,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs index 091b1ee6aac0..e97652f5fc6a 100644 --- a/src/index/src/bloom_filter/creator/finalize_segment.rs +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -43,7 +43,7 @@ pub struct FinalizedBloomFilterStorage { intermediate_prefix: String, /// The provider for intermediate Bloom filter files. - intermediate_provider: Box, + intermediate_provider: Arc, /// The memory usage of the in-memory Bloom filters. memory_usage: usize, @@ -59,7 +59,7 @@ pub struct FinalizedBloomFilterStorage { impl FinalizedBloomFilterStorage { /// Creates a new `FinalizedBloomFilterStorage`. pub fn new( - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -132,7 +132,7 @@ impl FinalizedBloomFilterStorage { /// Drains the storage and returns a stream of finalized Bloom filter segments. pub async fn drain( &mut self, - ) -> Result> + '_>>> { + ) -> Result> + Send + '_>>> { // FAST PATH: memory only if self.intermediate_file_id_counter == 0 { return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok)))); @@ -257,7 +257,7 @@ mod tests { let global_memory_usage = Arc::new(AtomicUsize::new(0)); let global_memory_usage_threshold = Some(1024 * 1024); // 1MB - let provider = Box::new(mock_provider); + let provider = Arc::new(mock_provider); let mut storage = FinalizedBloomFilterStorage::new( provider, global_memory_usage.clone(), diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 788afe033124..6dc592100fcf 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -190,7 +190,7 @@ mod tests { let mut writer = Cursor::new(vec![]); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e2499140fd61..e7d5e779b675 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -21,7 +21,6 @@ use common_telemetry::{info, warn}; use common_time::TimeToLive; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -41,7 +40,7 @@ use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; -use crate::sst::file::{FileMeta, IndexType}; +use crate::sst::file::FileMeta; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -336,16 +335,7 @@ impl Compactor for DefaultCompactor { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1baffd4a7fa1..3a6f368bacd8 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -816,8 +816,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to retrieve fulltext options from column metadata"))] - FulltextOptions { + #[snafu(display("Failed to retrieve index options from column metadata"))] + IndexOptions { #[snafu(implicit)] location: Location, source: datatypes::error::Error, @@ -904,6 +904,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to push value to bloom filter"))] + PushBloomFilterValue { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to finish bloom filter"))] + BloomFilterFinish { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1029,7 +1043,7 @@ impl ErrorExt for Error { UnsupportedOperation { .. } => StatusCode::Unsupported, RemoteCompaction { .. } => StatusCode::Unexpected, - FulltextOptions { source, .. } => source.status_code(), + IndexOptions { source, .. } => source.status_code(), CreateFulltextCreator { source, .. } => source.status_code(), CastVector { source, .. } => source.status_code(), FulltextPushText { source, .. } @@ -1039,7 +1053,12 @@ impl ErrorExt for Error { RegionBusy { .. } => StatusCode::RegionBusy, GetSchemaMetadata { source, .. } => source.status_code(), Timeout { .. } => StatusCode::Cancelled, + DecodeArrowRowGroup { .. } => StatusCode::Internal, + + PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => { + source.status_code() + } } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index b522f225f9f0..64a739068ad9 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{debug, error, info, trace}; -use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; @@ -45,7 +44,7 @@ use crate::request::{ SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; -use crate::sst::file::{FileId, FileMeta, IndexType}; +use crate::sst::file::{FileId, FileMeta}; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -378,16 +377,7 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 5a9932ab433b..844d3e5d08f8 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -143,6 +143,8 @@ pub enum IndexType { InvertedIndex, /// Full-text index. FulltextIndex, + /// Bloom filter. + BloomFilter, } impl FileMeta { @@ -156,6 +158,11 @@ impl FileMeta { self.available_indexes.contains(&IndexType::FulltextIndex) } + /// Returns true if the file has a bloom filter + pub fn bloom_filter_available(&self) -> bool { + self.available_indexes.contains(&IndexType::BloomFilter) + } + /// Returns the size of the inverted index file pub fn inverted_index_size(&self) -> Option { if self.available_indexes.len() == 1 && self.inverted_index_available() { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1972f3d7abb6..b6eac91e56d9 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod bloom_filter; +mod codec; pub(crate) mod fulltext_index; mod indexer; pub(crate) mod intermediate; @@ -22,8 +24,10 @@ pub(crate) mod store; use std::num::NonZeroUsize; +use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, warn}; use puffin_manager::SstPuffinManager; +use smallvec::SmallVec; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; @@ -33,13 +37,14 @@ use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; -use crate::sst::file::FileId; +use crate::sst::file::{FileId, IndexType}; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; +pub(crate) const TYPE_BLOOM_FILTER: &str = "bloom_filter"; /// Output of the index creation. #[derive(Debug, Clone, Default)] @@ -50,6 +55,24 @@ pub struct IndexOutput { pub inverted_index: InvertedIndexOutput, /// Fulltext index output. pub fulltext_index: FulltextIndexOutput, + /// Bloom filter output. + pub bloom_filter: BloomFilterOutput, +} + +impl IndexOutput { + pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> { + let mut indexes = SmallVec::new(); + if self.inverted_index.is_available() { + indexes.push(IndexType::InvertedIndex); + } + if self.fulltext_index.is_available() { + indexes.push(IndexType::FulltextIndex); + } + if self.bloom_filter.is_available() { + indexes.push(IndexType::BloomFilter); + } + indexes + } } /// Base output of the index creation. @@ -73,6 +96,8 @@ impl IndexBaseOutput { pub type InvertedIndexOutput = IndexBaseOutput; /// Output of the fulltext index creation. pub type FulltextIndexOutput = IndexBaseOutput; +/// Output of the bloom filter creation. +pub type BloomFilterOutput = IndexBaseOutput; /// The index creator that hides the error handling details. #[derive(Default)] @@ -86,6 +111,8 @@ pub struct Indexer { last_mem_inverted_index: usize, fulltext_indexer: Option, last_mem_fulltext_index: usize, + bloom_filter_indexer: Option, + last_mem_bloom_filter: usize, } impl Indexer { @@ -129,6 +156,15 @@ impl Indexer { .with_label_values(&[TYPE_FULLTEXT_INDEX]) .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64); self.last_mem_fulltext_index = fulltext_mem; + + let bloom_filter_mem = self + .bloom_filter_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()); + INDEX_CREATE_MEMORY_USAGE + .with_label_values(&[TYPE_BLOOM_FILTER]) + .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64); + self.last_mem_bloom_filter = bloom_filter_mem; } } @@ -158,7 +194,11 @@ impl<'a> IndexerBuilder<'a> { indexer.inverted_indexer = self.build_inverted_indexer(); indexer.fulltext_indexer = self.build_fulltext_indexer().await; - if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() { + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + if indexer.inverted_indexer.is_none() + && indexer.fulltext_indexer.is_none() + && indexer.bloom_filter_indexer.is_none() + { indexer.abort().await; return Indexer::default(); } @@ -266,7 +306,7 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {}", + "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", self.metadata.region_id, self.file_id, err ); } else { @@ -278,6 +318,53 @@ impl<'a> IndexerBuilder<'a> { None } + + fn build_bloom_filter_indexer(&self) -> Option { + let create = true; // TODO(zhongzc): add config for bloom filter + + if !create { + debug!( + "Skip creating bloom filter due to config, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return None; + } + + let mem_limit = Some(16 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let indexer = BloomFilterIndexer::new( + self.file_id, + self.metadata, + self.intermediate_manager.clone(), + mem_limit, + ); + + let err = match indexer { + Ok(indexer) => { + if indexer.is_none() { + debug!( + "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + return indexer; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.metadata.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to create bloom filter, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + + None + } } #[cfg(test)] @@ -286,7 +373,9 @@ mod tests { use api::v1::SemanticType; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, FulltextOptions}; + use datatypes::schema::{ + ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType, + }; use object_store::services::Memory; use object_store::ObjectStore; use puffin_manager::PuffinManagerFactory; @@ -298,12 +387,14 @@ mod tests { struct MetaConfig { with_tag: bool, with_fulltext: bool, + with_skipping_bloom: bool, } fn mock_region_metadata( MetaConfig { with_tag, with_fulltext, + with_skipping_bloom, }: MetaConfig, ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); @@ -354,6 +445,24 @@ mod tests { builder.push_column_metadata(column); } + if with_skipping_bloom { + let column_schema = + ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false) + .with_skipping_options(SkippingIndexOptions { + granularity: 42, + index_type: SkippingIndexType::BloomFilter, + }) + .unwrap(); + + let column = ColumnMetadata { + column_schema, + semantic_type: SemanticType::Field, + column_id: 5, + }; + + builder.push_column_metadata(column); + } + Arc::new(builder.build().unwrap()) } @@ -374,6 +483,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -392,6 +502,7 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); } #[tokio::test] @@ -403,6 +514,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -456,6 +568,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: false, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -474,10 +587,12 @@ mod tests { assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: false, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -486,7 +601,7 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), @@ -496,6 +611,31 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + with_skipping_bloom: false, + }); + let indexer = IndexerBuilder { + op_type: OperationType::Flush, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -507,6 +647,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs new file mode 100644 index 000000000000..347195a3b16b --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod creator; + +const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs new file mode 100644 index 000000000000..8c56800f47e7 --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -0,0 +1,530 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use common_telemetry::warn; +use datatypes::schema::SkippingIndexType; +use index::bloom_filter::creator::BloomFilterCreator; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::error::{ + BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, + PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, +}; +use crate::read::Batch; +use crate::row_converter::SortField; +use crate::sst::file::FileId; +use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::TYPE_BLOOM_FILTER; + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// The indexer for the bloom filter index. +pub struct BloomFilterIndexer { + /// The bloom filter creators. + creators: HashMap, + + /// The provider for intermediate files. + temp_file_provider: Arc, + + /// Codec for decoding primary keys. + codec: IndexValuesCodec, + + /// Whether the indexing process has been aborted. + aborted: bool, + + /// The statistics of the indexer. + stats: Statistics, + + /// The global memory usage. + global_memory_usage: Arc, +} + +impl BloomFilterIndexer { + /// Creates a new bloom filter indexer. + pub fn new( + sst_file_id: FileId, + metadata: &RegionMetadataRef, + intermediate_manager: IntermediateManager, + memory_usage_threshold: Option, + ) -> Result> { + let mut creators = HashMap::new(); + + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(&metadata.region_id, &sst_file_id), + intermediate_manager, + )); + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + + for column in &metadata.column_metadatas { + let options = + column + .column_schema + .skipping_index_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; + + let options = match options { + Some(options) if options.index_type == SkippingIndexType::BloomFilter => options, + _ => continue, + }; + + let creator = BloomFilterCreator::new( + options.granularity as _, + temp_file_provider.clone(), + global_memory_usage.clone(), + memory_usage_threshold, + ); + creators.insert(column.column_id, creator); + } + + if creators.is_empty() { + return Ok(None); + } + + let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let indexer = Self { + creators, + temp_file_provider, + codec, + aborted: false, + stats: Statistics::new(TYPE_BLOOM_FILTER), + global_memory_usage, + }; + Ok(Some(indexer)) + } + + /// Updates index with a batch of rows. + /// Garbage will be cleaned up if failed to update. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn update(&mut self, batch: &Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() { + return Ok(()); + } + + if let Err(update_err) = self.do_update(batch).await { + // clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err:?}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Finishes index creation and cleans up garbage. + /// Returns the number of rows and bytes written. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.stats.row_count() == 0 { + // no IO is performed, no garbage to clean up, just return + return Ok((0, 0)); + } + + let finish_res = self.do_finish(puffin_writer).await; + // clean up garbage no matter finish successfully or not + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err:?}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) + } + + /// Aborts index creation and clean up garbage. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + self.aborted = true; + + self.do_cleanup().await + } + + async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + // Tags + for ((col_id, _), field, value) in self.codec.decode(batch.primary_key())? { + let Some(creator) = self.creators.get_mut(col_id) else { + continue; + }; + let elems = value + .map(|v| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(v.as_value_ref(), field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + creator + .push_n_row_elems(n, elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + + // Fields + for field in batch.fields() { + let Some(creator) = self.creators.get_mut(&field.column_id) else { + continue; + }; + + let sort_field = SortField::new(field.data.data_type()); + for i in 0..n { + let value = field.data.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + } + Ok(()) + } + + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { + let mut guard = self.stats.record_finish(); + + for (id, creator) in &mut self.creators { + let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?; + guard.inc_byte_count(written_bytes); + } + + Ok(()) + } + + async fn do_cleanup(&mut self) -> Result<()> { + let mut _guard = self.stats.record_cleanup(); + + self.creators.clear(); + self.temp_file_provider.cleanup().await + } + + /// Data flow of finishing index: + /// + /// ```text + /// (In Memory Buffer) + /// ┌──────┐ + /// ┌─────────────┐ │ PIPE │ + /// │ │ write index data │ │ + /// │ IndexWriter ├──────────────────►│ tx │ + /// │ │ │ │ + /// └─────────────┘ │ │ + /// ┌─────────────────┤ rx │ + /// ┌─────────────┐ │ read as blob └──────┘ + /// │ │ │ + /// │ PuffinWriter├─┤ + /// │ │ │ copy to file ┌──────┐ + /// └─────────────┘ └────────────────►│ File │ + /// └──────┘ + /// ``` + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish_single_creator( + col_id: &ColumnId, + creator: &mut BloomFilterCreator, + puffin_writer: &mut SstPuffinWriter, + ) -> Result { + let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + + let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); + let (index_finish, puffin_add_blob) = futures::join!( + creator.finish(tx.compat_write()), + puffin_writer.put_blob(&blob_name, rx.compat(), PutOptions::default()) + ); + + match ( + puffin_add_blob.context(PuffinAddBlobSnafu), + index_finish.context(BloomFilterFinishSnafu), + ) { + (Err(e1), Err(e2)) => BiErrorsSnafu { + first: Box::new(e1), + second: Box::new(e2), + } + .fail()?, + + (Ok(_), e @ Err(_)) => e?, + (e @ Err(_), Ok(_)) => e.map(|_| ())?, + (Ok(written_bytes), Ok(_)) => { + return Ok(written_bytes); + } + } + + Ok(0) + } + + /// Returns the memory usage of the indexer. + pub fn memory_usage(&self) -> usize { + self.global_memory_usage + .load(std::sync::atomic::Ordering::Relaxed) + } + + /// Returns the column ids to be indexed. + pub fn column_ids(&self) -> impl Iterator + use<'_> { + self.creators.keys().copied() + } +} + +#[cfg(test)] +mod tests { + use std::iter; + + use api::v1::SemanticType; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SkippingIndexOptions}; + use datatypes::value::ValueRef; + use datatypes::vectors::{UInt64Vector, UInt8Vector}; + use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; + use object_store::services::Memory; + use object_store::ObjectStore; + use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + use crate::read::BatchColumn; + use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::sst::index::puffin_manager::PuffinManagerFactory; + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() + } + + /// tag_str: + /// - type: string + /// - index: bloom filter + /// - granularity: 2 + /// - column_id: 1 + /// + /// ts: + /// - type: timestamp + /// - index: time index + /// - column_id: 2 + /// + /// field_u64: + /// - type: uint64 + /// - index: bloom filter + /// - granularity: 4 + /// - column_id: 3 + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_str", + ConcreteDataType::string_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkippingIndexType::BloomFilter, + granularity: 2, + }) + .unwrap(), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_u64", + ConcreteDataType::uint64_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkippingIndexType::BloomFilter, + granularity: 4, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![1]); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { + let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; + let codec = McmpRowCodec::new(fields); + let row: [ValueRef; 1] = [str_tag.as_ref().into()]; + let primary_key = codec.encode(row.into_iter()).unwrap(); + + let u64_field = BatchColumn { + column_id: 3, + data: Arc::new(UInt64Vector::from_iter_values(u64_field)), + }; + let num_rows = u64_field.data.len(); + + Batch::new( + primary_key, + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt8Vector::from_iter_values( + iter::repeat(1).take(num_rows), + )), + vec![u64_field], + ) + .unwrap() + } + + #[tokio::test] + async fn test_bloom_filter_indexer() { + let prefix = "test_bloom_filter_indexer_"; + let object_store = mock_object_store(); + let intm_mgr = new_intm_mgr(prefix).await; + let region_metadata = mock_region_metadata(); + let memory_usage_threshold = Some(1024); + + let mut indexer = BloomFilterIndexer::new( + FileId::random(), + ®ion_metadata, + intm_mgr, + memory_usage_threshold, + ) + .unwrap() + .unwrap(); + + // push 20 rows + let batch = new_batch("tag1", 0..10); + indexer.update(&batch).await.unwrap(); + + let batch = new_batch("tag2", 10..20); + indexer.update(&batch).await.unwrap(); + + let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; + let puffin_manager = factory.build(object_store); + + let index_file_name = "index_file"; + let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap(); + let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap(); + assert_eq!(row_count, 20); + assert!(byte_count > 0); + puffin_writer.finish().await.unwrap(); + + let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap(); + + // tag_str + { + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-1") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 10); + for i in 0..5 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag1")); + } + for i in 5..10 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag2")); + } + } + + // field_u64 + { + let sort_field = SortField::new(ConcreteDataType::uint64_datatype()); + + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-3") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 5); + for i in 0u64..20 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i as usize / 4]) + .await + .unwrap(); + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) + .unwrap(); + + assert!(bf.contains(&buf)); + } + } + } +} diff --git a/src/mito2/src/sst/index/inverted_index/codec.rs b/src/mito2/src/sst/index/codec.rs similarity index 100% rename from src/mito2/src/sst/index/inverted_index/codec.rs rename to src/mito2/src/sst/index/codec.rs diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 416e39d9dd5e..41fa15bd7c72 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -27,8 +27,7 @@ use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, - FulltextOptionsSnafu, FulltextPushTextSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, - Result, + FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, }; use crate::read::Batch; use crate::sst::file::FileId; @@ -61,13 +60,12 @@ impl FulltextIndexer { let mut creators = HashMap::new(); for column in &metadata.column_metadatas { - let options = - column - .column_schema - .fulltext_options() - .context(FulltextOptionsSnafu { - column_name: &column.column_schema.name, - })?; + let options = column + .column_schema + .fulltext_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; // Relax the type constraint here as many types can be casted to string. diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 68034d48fb29..5b29009a033b 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -20,6 +20,7 @@ impl Indexer { pub(crate) async fn do_abort(&mut self) { self.do_abort_inverted_index().await; self.do_abort_fulltext_index().await; + self.do_abort_bloom_filter().await; self.puffin_manager = None; } @@ -33,7 +34,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -54,7 +55,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -64,4 +65,25 @@ impl Indexer { ); } } + + async fn do_abort_bloom_filter(&mut self) { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return; + }; + let Err(err) = indexer.abort().await else { + return; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index a0157a9b66f4..025eead758ff 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -15,11 +15,14 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; +use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; -use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput}; +use crate::sst::index::{ + BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput, +}; impl Indexer { pub(crate) async fn do_finish(&mut self) -> IndexOutput { @@ -46,6 +49,12 @@ impl Indexer { return IndexOutput::default(); } + let success = self.do_finish_bloom_filter(&mut writer, &mut output).await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + output.file_size = self.do_finish_puffin_writer(writer).await; output } @@ -60,7 +69,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -81,7 +90,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -119,7 +128,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -156,7 +165,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -169,6 +178,43 @@ impl Indexer { false } + async fn do_finish_bloom_filter( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return true; + }; + + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_bloom_filter_output( + &mut index_output.bloom_filter, + row_count, + byte_count, + &indexer, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + fn fill_inverted_index_output( &mut self, output: &mut InvertedIndexOutput, @@ -202,4 +248,21 @@ impl Indexer { output.row_count = row_count; output.columns = indexer.column_ids().collect(); } + + fn fill_bloom_filter_output( + &mut self, + output: &mut BloomFilterOutput, + row_count: RowCount, + byte_count: ByteCount, + indexer: &BloomFilterIndexer, + ) { + debug!( + "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = indexer.column_ids().collect(); + } } diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index c08f171bb415..c2ab33f0e13a 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -29,6 +29,9 @@ impl Indexer { if !self.do_update_fulltext_index(batch).await { self.do_abort().await; } + if !self.do_update_bloom_filter(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -43,7 +46,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -68,7 +71,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -80,4 +83,29 @@ impl Indexer { false } + + /// Returns false if the update failed. + async fn do_update_bloom_filter(&mut self, batch: &Batch) -> bool { + let Some(creator) = self.bloom_filter_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index d0da804c745b..fd8845f96ac3 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -14,13 +14,25 @@ use std::path::PathBuf; +use async_trait::async_trait; +use common_error::ext::BoxedError; use common_telemetry::warn; +use futures::{AsyncRead, AsyncWrite}; +use index::error as index_error; +use index::error::Result as IndexResult; +use index::external_provider::ExternalTempFileProvider; use object_store::util::{self, normalize_dir}; +use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; use uuid::Uuid; use crate::access_layer::new_fs_cache_store; use crate::error::Result; +use crate::metrics::{ + INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, +}; use crate::sst::file::FileId; use crate::sst::index::store::InstrumentedStore; @@ -129,14 +141,105 @@ impl IntermediateLocation { } } +/// `TempFileProvider` implements `ExternalTempFileProvider`. +/// It uses `InstrumentedStore` to create and read intermediate files. +pub(crate) struct TempFileProvider { + /// Provides the location of intermediate files. + location: IntermediateLocation, + /// Provides store to access to intermediate files. + manager: IntermediateManager, +} + +#[async_trait] +impl ExternalTempFileProvider for TempFileProvider { + async fn create( + &self, + file_group: &str, + file_id: &str, + ) -> IndexResult> { + let path = self.location.file_path(file_group, file_id); + let writer = self + .manager + .store() + .writer( + &path, + &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, + &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, + &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + Ok(Box::new(writer)) + } + + async fn read_all( + &self, + file_group: &str, + ) -> IndexResult)>> { + let file_group_path = self.location.file_group_path(file_group); + let entries = self + .manager + .store() + .list(&file_group_path) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + let mut readers = Vec::with_capacity(entries.len()); + + for entry in entries { + if entry.metadata().is_dir() { + warn!("Unexpected entry in index creation dir: {:?}", entry.path()); + continue; + } + + let im_file_id = self.location.im_file_id_from_path(entry.path()); + + let reader = self + .manager + .store() + .reader( + entry.path(), + &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + &INDEX_INTERMEDIATE_READ_OP_TOTAL, + &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + readers.push((im_file_id, Box::new(reader) as _)); + } + + Ok(readers) + } +} + +impl TempFileProvider { + /// Creates a new `TempFileProvider`. + pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { + Self { location, manager } + } + + /// Removes all intermediate files. + pub async fn cleanup(&self) -> Result<()> { + self.manager + .store() + .remove_all(self.location.dir_to_cleanup()) + .await + } +} + #[cfg(test)] mod tests { use std::ffi::OsStr; use common_test_util::temp_dir; + use futures::{AsyncReadExt, AsyncWriteExt}; use regex::Regex; + use store_api::storage::RegionId; use super::*; + use crate::sst::file::FileId; #[tokio::test] async fn test_manager() { @@ -212,4 +315,58 @@ mod tests { .is_match(&pi.next().unwrap().to_string_lossy())); // fulltext path assert!(pi.next().is_none()); } + + #[tokio::test] + async fn test_temp_file_provider_basic() { + let temp_dir = temp_dir::create_temp_dir("intermediate"); + let path = temp_dir.path().display().to_string(); + + let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); + let store = IntermediateManager::init_fs(path).await.unwrap(); + let provider = TempFileProvider::new(location.clone(), store); + + let file_group = "tag0"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_id = "0000000100"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"world").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_group = "tag1"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"foo").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let readers = provider.read_all("tag0").await.unwrap(); + assert_eq!(readers.len(), 2); + for (_, mut reader) in readers { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert!(matches!(buf.as_slice(), b"hello" | b"world")); + } + let readers = provider.read_all("tag1").await.unwrap(); + assert_eq!(readers.len(), 1); + let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"foo"); + + provider.cleanup().await.unwrap(); + + assert!(provider + .manager + .store() + .list(location.dir_to_cleanup()) + .await + .unwrap() + .is_empty()); + } } diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs index d325f735a431..73dca4ac47f2 100644 --- a/src/mito2/src/sst/index/inverted_index.rs +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) mod applier; -mod codec; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index c2f90b293003..e14bb89bd1c9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -37,8 +37,8 @@ use crate::cache::file_cache::FileCacheRef; use crate::cache::index::inverted_index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; +use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::inverted_index::applier::InvertedIndexApplier; -use crate::sst::index::inverted_index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; /// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan. diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 0076322fccbd..138035d554a1 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod temp_provider; - use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -38,9 +36,10 @@ use crate::error::{ use crate::read::Batch; use crate::row_converter::SortField; use crate::sst::file::FileId; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; -use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec}; -use crate::sst::index::inverted_index::creator::temp_provider::TempFileProvider; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; diff --git a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs deleted file mode 100644 index 1822f3119459..000000000000 --- a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_telemetry::warn; -use futures::{AsyncRead, AsyncWrite}; -use index::error as index_error; -use index::error::Result as IndexResult; -use index::external_provider::ExternalTempFileProvider; -use snafu::ResultExt; - -use crate::error::Result; -use crate::metrics::{ - INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, -}; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; - -/// `TempFileProvider` implements `ExternalTempFileProvider`. -/// It uses `InstrumentedStore` to create and read intermediate files. -pub(crate) struct TempFileProvider { - /// Provides the location of intermediate files. - location: IntermediateLocation, - /// Provides store to access to intermediate files. - manager: IntermediateManager, -} - -#[async_trait] -impl ExternalTempFileProvider for TempFileProvider { - async fn create( - &self, - file_group: &str, - file_id: &str, - ) -> IndexResult> { - let path = self.location.file_path(file_group, file_id); - let writer = self - .manager - .store() - .writer( - &path, - &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, - &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, - &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - Ok(Box::new(writer)) - } - - async fn read_all( - &self, - file_group: &str, - ) -> IndexResult)>> { - let file_group_path = self.location.file_group_path(file_group); - let entries = self - .manager - .store() - .list(&file_group_path) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - let mut readers = Vec::with_capacity(entries.len()); - - for entry in entries { - if entry.metadata().is_dir() { - warn!("Unexpected entry in index creation dir: {:?}", entry.path()); - continue; - } - - let im_file_id = self.location.im_file_id_from_path(entry.path()); - - let reader = self - .manager - .store() - .reader( - entry.path(), - &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - &INDEX_INTERMEDIATE_READ_OP_TOTAL, - &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - readers.push((im_file_id, Box::new(reader) as _)); - } - - Ok(readers) - } -} - -impl TempFileProvider { - /// Creates a new `TempFileProvider`. - pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { - Self { location, manager } - } - - /// Removes all intermediate files. - pub async fn cleanup(&self) -> Result<()> { - self.manager - .store() - .remove_all(self.location.dir_to_cleanup()) - .await - } -} - -#[cfg(test)] -mod tests { - use common_test_util::temp_dir; - use futures::{AsyncReadExt, AsyncWriteExt}; - use store_api::storage::RegionId; - - use super::*; - use crate::sst::file::FileId; - - #[tokio::test] - async fn test_temp_file_provider_basic() { - let temp_dir = temp_dir::create_temp_dir("intermediate"); - let path = temp_dir.path().display().to_string(); - - let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); - let store = IntermediateManager::init_fs(path).await.unwrap(); - let provider = TempFileProvider::new(location.clone(), store); - - let file_group = "tag0"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_id = "0000000100"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"world").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_group = "tag1"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"foo").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let readers = provider.read_all("tag0").await.unwrap(); - assert_eq!(readers.len(), 2); - for (_, mut reader) in readers { - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert!(matches!(buf.as_slice(), b"hello" | b"world")); - } - let readers = provider.read_all("tag1").await.unwrap(); - assert_eq!(readers.len(), 1); - let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"foo"); - - provider.cleanup().await.unwrap(); - - assert!(provider - .manager - .store() - .list(location.dir_to_cleanup()) - .await - .unwrap() - .is_empty()); - } -}