diff --git a/Cargo.lock b/Cargo.lock index 6640573a353f..b8eafcbdcbcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3570,7 +3570,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2aaee38de81047537dfa42af9df63bcfb866e06c#2aaee38de81047537dfa42af9df63bcfb866e06c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b1d403088f02136bcebde53d604f491c260ca8e2#b1d403088f02136bcebde53d604f491c260ca8e2" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index 04012f49691f..a5ec4a69ff10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2aaee38de81047537dfa42af9df63bcfb866e06c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b1d403088f02136bcebde53d604f491c260ca8e2" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 65658c28709f..065e2d29991e 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -64,6 +64,9 @@ pub enum Error { payload_size: u64, }, + #[snafu(display("Unexpected zero segment row count"))] + UnexpectedZeroSegmentRowCount { location: Location }, + #[snafu(display("Failed to decode fst"))] DecodeFst { #[snafu(source)] @@ -109,6 +112,9 @@ pub enum Error { location: Location, predicates: Vec, }, + + #[snafu(display("index not found, name: {name}"))] + IndexNotFound { name: String, location: Location }, } impl ErrorExt for Error { @@ -118,6 +124,7 @@ impl ErrorExt for Error { Seek { .. } | Read { .. } | UnexpectedFooterPayloadSize { .. } + | UnexpectedZeroSegmentRowCount { .. } | UnexpectedOffsetSize { .. } | UnexpectedBlobSize { .. } | DecodeProto { .. } @@ -128,7 +135,8 @@ impl ErrorExt for Error { | ParseDFA { .. } | KeysApplierWithoutInList { .. } | IntersectionApplierWithInList { .. } - | EmptyPredicates { .. } => StatusCode::InvalidArguments, + | EmptyPredicates { .. } + | IndexNotFound { .. } => StatusCode::InvalidArguments, } } diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index e8e9c72c9579..705f4b409844 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -25,7 +25,7 @@ use crate::inverted_index::FstMap; /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] -pub trait InvertedIndexReader { +pub trait InvertedIndexReader: Send { /// Retrieve metadata of all inverted indices stored within the blob. async fn metadata(&mut self) -> Result; diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 40509e46bafb..ba8231194055 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -143,7 +143,11 @@ mod tests { }; // metas - let mut metas = InvertedIndexMetas::default(); + let mut metas = InvertedIndexMetas { + total_row_count: 10, + segment_row_count: 1, + ..Default::default() + }; metas.metas.insert(meta.name.clone(), meta); metas.metas.insert(meta1.name.clone(), meta1); let mut meta_buf = Vec::new(); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 77cc61d7c21e..478352ee685a 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, - UnexpectedOffsetSizeSnafu, + UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; @@ -85,6 +85,11 @@ impl InvertedIndeFooterReader { /// Check if the read metadata is consistent with expected sizes and offsets. fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> { + ensure!( + metas.segment_row_count > 0, + UnexpectedZeroSegmentRowCountSnafu + ); + for meta in metas.metas.values() { let InvertedIndexMeta { base_offset, @@ -116,7 +121,10 @@ mod tests { use super::*; fn create_test_payload(meta: InvertedIndexMeta) -> Vec { - let mut metas = InvertedIndexMetas::default(); + let mut metas = InvertedIndexMetas { + segment_row_count: 1, + ..Default::default() + }; metas.metas.insert("test".to_string(), meta); let mut payload_buf = vec![]; @@ -131,7 +139,6 @@ mod tests { async fn test_read_payload() { let meta = InvertedIndexMeta { name: "test".to_string(), - segment_row_count: 4096, ..Default::default() }; @@ -145,14 +152,12 @@ mod tests { assert_eq!(metas.metas.len(), 1); let index_meta = &metas.metas.get("test").unwrap(); assert_eq!(index_meta.name, "test"); - assert_eq!(index_meta.segment_row_count, 4096); } #[tokio::test] async fn test_invalid_footer_payload_size() { let meta = InvertedIndexMeta { name: "test".to_string(), - segment_row_count: 4096, ..Default::default() }; @@ -171,7 +176,6 @@ mod tests { name: "test".to_string(), base_offset: 0, inverted_index_size: 1, // Set size to 1 to make ecceed the blob size - segment_row_count: 4096, ..Default::default() }; diff --git a/src/index/src/inverted_index/search.rs b/src/index/src/inverted_index/search.rs index e4ab3d5c3b70..8e28440c7e28 100644 --- a/src/index/src/inverted_index/search.rs +++ b/src/index/src/inverted_index/search.rs @@ -14,4 +14,5 @@ pub mod fst_apply; pub mod fst_values_mapper; +pub mod index_apply; pub mod predicate; diff --git a/src/index/src/inverted_index/search/fst_apply.rs b/src/index/src/inverted_index/search/fst_apply.rs index 32b71b2bf3d9..9f54d0d88918 100644 --- a/src/index/src/inverted_index/search/fst_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply.rs @@ -22,6 +22,7 @@ use crate::inverted_index::FstMap; /// A trait for objects that can process a finite state transducer (FstMap) and return /// associated values. +#[mockall::automock] pub trait FstApplier: Send + Sync { /// Retrieves values from an FstMap. /// diff --git a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs index a5ed84dce7ce..4ec5710a3435 100644 --- a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs @@ -89,7 +89,7 @@ impl KeysFstApplier { fn get_list(p: &Predicate) -> &HashSet { match p { Predicate::InList(i) => &i.list, - _ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists + _ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists` } } diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs new file mode 100644 index 000000000000..35d8c387a2d6 --- /dev/null +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod predicates_apply; + +use async_trait::async_trait; +pub use predicates_apply::PredicatesIndexApplier; + +use crate::inverted_index::error::Result; +use crate::inverted_index::format::reader::InvertedIndexReader; + +/// A trait for processing and transforming indices obtained from an inverted index. +/// +/// Applier instances are reusable and work with various `InvertedIndexReader` instances, +/// avoiding repeated compilation of fixed predicates such as regex patterns. +#[async_trait] +pub trait IndexApplier { + /// Applies the predefined predicates to the data read by the given index reader, returning + /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). + async fn apply( + &self, + context: SearchContext, + reader: &mut dyn InvertedIndexReader, + ) -> Result>; +} + +/// A context for searching the inverted index. +#[derive(Clone, Debug, Eq, PartialEq, Default)] +pub struct SearchContext { + /// `index_not_found_strategy` controls the behavior of the applier when the index is not found. + pub index_not_found_strategy: IndexNotFoundStrategy, +} + +/// Defines the behavior of an applier when the index is not found. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)] +pub enum IndexNotFoundStrategy { + /// Return an empty list of indices. + #[default] + ReturnEmpty, + + /// Ignore the index and continue. + Ignore, + + /// Throw an error. + ThrowError, +} diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs new file mode 100644 index 000000000000..e2bea2756a7f --- /dev/null +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -0,0 +1,346 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_base::BitVec; +use greptime_proto::v1::index::InvertedIndexMetas; + +use crate::inverted_index::error::{IndexNotFoundSnafu, Result}; +use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::search::fst_apply::{ + FstApplier, IntersectionFstApplier, KeysFstApplier, +}; +use crate::inverted_index::search::fst_values_mapper::FstValuesMapper; +use crate::inverted_index::search::index_apply::{ + IndexApplier, IndexNotFoundStrategy, SearchContext, +}; +use crate::inverted_index::search::predicate::Predicate; + +type IndexName = String; + +/// `PredicatesIndexApplier` contains a collection of `FstApplier`s, each associated with an index name, +/// to process and filter index data based on compiled predicates. +pub struct PredicatesIndexApplier { + /// A list of `FstApplier`s, each associated with a specific index name + /// (e.g. a tag field uses its column name as index name) + fst_appliers: Vec<(IndexName, Box)>, +} + +#[async_trait] +impl IndexApplier for PredicatesIndexApplier { + /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual + /// bitmaps obtained for each index to result in a final set of indices. + async fn apply( + &self, + context: SearchContext, + reader: &mut dyn InvertedIndexReader, + ) -> Result> { + let metadata = reader.metadata().await?; + + let mut bitmap = Self::bitmap_full_range(&metadata); + // TODO(zhongzc): optimize the order of applying to make it quicker to return empty. + for (name, fst_applier) in &self.fst_appliers { + if bitmap.count_ones() == 0 { + break; + } + + let Some(meta) = metadata.metas.get(name) else { + match context.index_not_found_strategy { + IndexNotFoundStrategy::ReturnEmpty => { + return Ok(vec![]); + } + IndexNotFoundStrategy::Ignore => { + continue; + } + IndexNotFoundStrategy::ThrowError => { + return IndexNotFoundSnafu { name }.fail(); + } + } + }; + + let fst = reader.fst(meta).await?; + let values = fst_applier.apply(&fst); + + let mut mapper = FstValuesMapper::new(&mut *reader, meta); + let bm = mapper.map_values(&values).await?; + + bitmap &= bm; + } + + Ok(bitmap.iter_ones().collect()) + } +} + +impl PredicatesIndexApplier { + /// Constructs an instance of `PredicatesIndexApplier` based on a list of tag predicates. + /// Chooses an appropriate `FstApplier` for each index name based on the nature of its predicates. + pub fn try_from(mut predicates: Vec<(IndexName, Vec)>) -> Result { + let mut fst_appliers = Vec::with_capacity(predicates.len()); + + // InList predicates are applied first to benefit from higher selectivity. + let in_list_index = predicates + .iter_mut() + .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_)))); + let mut iter = predicates.into_iter(); + for _ in 0..in_list_index { + let (tag_name, predicates) = iter.next().unwrap(); + let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _; + fst_appliers.push((tag_name, fst_applier)); + } + + for (tag_name, predicates) in iter { + if predicates.is_empty() { + continue; + } + let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _; + fst_appliers.push((tag_name, fst_applier)); + } + + Ok(PredicatesIndexApplier { fst_appliers }) + } + + /// Creates a `BitVec` representing the full range of data in the index for initial scanning. + fn bitmap_full_range(metadata: &InvertedIndexMetas) -> BitVec { + let total_count = metadata.total_row_count; + let segment_count = metadata.segment_row_count; + let len = (total_count + segment_count - 1) / segment_count; + BitVec::repeat(true, len as _) + } +} + +impl TryFrom)>> for PredicatesIndexApplier { + type Error = crate::inverted_index::error::Error; + fn try_from(predicates: Vec<(String, Vec)>) -> Result { + Self::try_from(predicates) + } +} + +#[cfg(test)] +mod tests { + use common_base::bit_vec::prelude::*; + use greptime_proto::v1::index::InvertedIndexMeta; + + use super::*; + use crate::inverted_index::error::Error; + use crate::inverted_index::format::reader::MockInvertedIndexReader; + use crate::inverted_index::search::fst_apply::MockFstApplier; + use crate::inverted_index::FstMap; + + fn s(s: &'static str) -> String { + s.to_owned() + } + + fn mock_metas(tags: impl IntoIterator) -> InvertedIndexMetas { + let mut metas = InvertedIndexMetas { + total_row_count: 8, + segment_row_count: 1, + ..Default::default() + }; + for tag in tags.into_iter() { + let meta = InvertedIndexMeta { + name: s(tag), + ..Default::default() + }; + metas.metas.insert(s(tag), meta); + } + metas + } + + fn key_fst_applier(value: &'static str) -> Box { + let mut mock_fst_applier = MockFstApplier::new(); + mock_fst_applier + .expect_apply() + .returning(move |fst| fst.get(value).into_iter().collect()); + Box::new(mock_fst_applier) + } + + fn fst_value(offset: u32, size: u32) -> u64 { + bytemuck::cast::<_, u64>([offset, size]) + } + + #[tokio::test] + async fn test_index_applier_apply_get_key() { + // An index applier that point-gets "tag-0_value-0" on tag "tag-0" + let applier = PredicatesIndexApplier { + fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))], + }; + + // An index reader with a single tag "tag-0" and a corresponding value "tag-0_value-0" + let mut mock_reader = MockInvertedIndexReader::new(); + mock_reader + .expect_metadata() + .returning(|| Ok(mock_metas(["tag-0"]))); + mock_reader + .expect_fst() + .returning(|meta| match meta.name.as_str() { + "tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap()), + _ => unreachable!(), + }); + mock_reader.expect_bitmap().returning(|meta, offset, size| { + match (meta.name.as_str(), offset, size) { + ("tag-0", 2, 1) => Ok(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]), + _ => unreachable!(), + } + }); + let indices = applier + .apply(SearchContext::default(), &mut mock_reader) + .await + .unwrap(); + assert_eq!(indices, vec![0, 2, 4, 6]); + + // An index reader with a single tag "tag-0" but without value "tag-0_value-0" + let mut mock_reader = MockInvertedIndexReader::new(); + mock_reader + .expect_metadata() + .returning(|| Ok(mock_metas(["tag-0"]))); + mock_reader + .expect_fst() + .returning(|meta| match meta.name.as_str() { + "tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap()), + _ => unreachable!(), + }); + let indices = applier + .apply(SearchContext::default(), &mut mock_reader) + .await + .unwrap(); + assert!(indices.is_empty()); + } + + #[tokio::test] + async fn test_index_applier_apply_intersection_with_two_tags() { + // An index applier that intersects "tag-0_value-0" on tag "tag-0" and "tag-1_value-a" on tag "tag-1" + let applier = PredicatesIndexApplier { + fst_appliers: vec![ + (s("tag-0"), key_fst_applier("tag-0_value-0")), + (s("tag-1"), key_fst_applier("tag-1_value-a")), + ], + }; + + // An index reader with two tags "tag-0" and "tag-1" and respective values "tag-0_value-0" and "tag-1_value-a" + let mut mock_reader = MockInvertedIndexReader::new(); + mock_reader + .expect_metadata() + .returning(|| Ok(mock_metas(["tag-0", "tag-1"]))); + mock_reader + .expect_fst() + .returning(|meta| match meta.name.as_str() { + "tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()), + "tag-1" => Ok(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()), + _ => unreachable!(), + }); + mock_reader.expect_bitmap().returning(|meta, offset, size| { + match (meta.name.as_str(), offset, size) { + ("tag-0", 1, 1) => Ok(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]), + ("tag-1", 2, 1) => Ok(bitvec![u8, Lsb0; 1, 1, 0, 1, 1, 0, 1, 1]), + _ => unreachable!(), + } + }); + + let indices = applier + .apply(SearchContext::default(), &mut mock_reader) + .await + .unwrap(); + assert_eq!(indices, vec![0, 4, 6]); + } + + #[tokio::test] + async fn test_index_applier_without_predicates() { + let applier = PredicatesIndexApplier { + fst_appliers: vec![], + }; + + let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new(); + mock_reader + .expect_metadata() + .returning(|| Ok(mock_metas(["tag-0"]))); + + let indices = applier + .apply(SearchContext::default(), &mut mock_reader) + .await + .unwrap(); + assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan + } + + #[tokio::test] + async fn test_index_applier_with_empty_index() { + let mut mock_reader = MockInvertedIndexReader::new(); + mock_reader.expect_metadata().returning(move || { + Ok(InvertedIndexMetas { + total_row_count: 0, // No rows + segment_row_count: 1, + ..Default::default() + }) + }); + + let mut mock_fst_applier = MockFstApplier::new(); + mock_fst_applier.expect_apply().never(); + + let applier = PredicatesIndexApplier { + fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))], + }; + + let indices = applier + .apply(SearchContext::default(), &mut mock_reader) + .await + .unwrap(); + assert!(indices.is_empty()); + } + + #[tokio::test] + async fn test_index_applier_with_nonexistent_index() { + let mut mock_reader = MockInvertedIndexReader::new(); + mock_reader + .expect_metadata() + .returning(|| Ok(mock_metas(vec![]))); + + let mut mock_fst_applier = MockFstApplier::new(); + mock_fst_applier.expect_apply().never(); + + let applier = PredicatesIndexApplier { + fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))], + }; + + let result = applier + .apply( + SearchContext { + index_not_found_strategy: IndexNotFoundStrategy::ThrowError, + }, + &mut mock_reader, + ) + .await; + assert!(matches!(result, Err(Error::IndexNotFound { .. }))); + + let indices = applier + .apply( + SearchContext { + index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, + }, + &mut mock_reader, + ) + .await + .unwrap(); + assert!(indices.is_empty()); + + let indices = applier + .apply( + SearchContext { + index_not_found_strategy: IndexNotFoundStrategy::Ignore, + }, + &mut mock_reader, + ) + .await + .unwrap(); + assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); + } +} diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 18a29fad0378..1102bee0e6ab 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -167,6 +167,7 @@ impl HeartbeatHandler for RegionLeaseHandler { .collect::>(), duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, + closeable_region_ids: vec![], }); Ok(HandleControl::Continue)