From 69a53130c223a821bf8ca1b9a028667c5a1c9bf9 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 30 Dec 2023 15:32:32 +0800 Subject: [PATCH] feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 1) (#3034) * feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1) Signed-off-by: Zhenchi * chore: add docs Signed-off-by: Zhenchi * fix: typos Signed-off-by: Zhenchi * fix: address comments Signed-off-by: Zhenchi * Update src/mito2/src/sst/index/applier/builder.rs Co-authored-by: Yingwen * fix: remove unwrap Signed-off-by: Zhenchi * chore: error source Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi Co-authored-by: Yingwen --- Cargo.lock | 13 +- Cargo.toml | 3 +- src/common/config/src/wal/kafka.rs | 2 +- src/index/src/inverted_index/error.rs | 2 +- .../search/fst_apply/intersection_apply.rs | 112 ++++---- src/mito2/Cargo.toml | 1 + src/mito2/src/error.rs | 20 ++ src/mito2/src/row_converter.rs | 6 +- src/mito2/src/sst.rs | 1 + src/mito2/src/sst/index.rs | 18 ++ src/mito2/src/sst/index/applier.rs | 47 ++++ src/mito2/src/sst/index/applier/builder.rs | 261 ++++++++++++++++++ .../src/sst/index/applier/builder/between.rs | 171 ++++++++++++ src/mito2/src/sst/index/codec.rs | 65 +++++ 14 files changed, 670 insertions(+), 52 deletions(-) create mode 100644 src/mito2/src/sst/index.rs create mode 100644 src/mito2/src/sst/index/applier.rs create mode 100644 src/mito2/src/sst/index/applier/builder.rs create mode 100644 src/mito2/src/sst/index/applier/builder/between.rs create mode 100644 src/mito2/src/sst/index/codec.rs diff --git a/Cargo.lock b/Cargo.lock index d179ea6c8c54..b042227a293b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,7 +4029,7 @@ dependencies = [ "prost 0.12.3", "rand", "regex", - "regex-automata 0.1.10", + "regex-automata 0.2.0", "snafu", "tokio", "tokio-util", @@ -4977,6 +4977,7 @@ dependencies = [ "datatypes", "futures", "humantime-serde", + "index", "lazy_static", "log-store", "memcomparable", @@ -7134,8 +7135,18 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9368763f5a9b804326f3af749e16f9abf378d227bcdee7634b13d8f17793782" dependencies = [ "fst", + "memchr", "regex-syntax 0.6.29", ] diff --git a/Cargo.toml b/Cargo.toml index 0e38d914eccb..a3413aa9d48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ prost = "0.12" raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } rand = "0.8" regex = "1.8" -regex-automata = { version = "0.1", features = ["transducer"] } +regex-automata = { version = "0.2", features = ["transducer"] } reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls-native-roots", @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } frontend = { path = "src/frontend" } +index = { path = "src/index" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index e93aa6cb2271..858991264bb6 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -42,7 +42,7 @@ pub struct KafkaConfig { #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size a kakfa batch producer could buffer. + /// The maximum log size a kafka batch producer could buffer. pub max_batch_size: ReadableSize, /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index b795e33003b7..6e5f39006eb9 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -113,7 +113,7 @@ pub enum Error { #[snafu(display("Failed to parse regex DFA"))] ParseDFA { #[snafu(source)] - error: regex_automata::Error, + error: Box, location: Location, }, diff --git a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs index a0ae0d7b9afb..a608acd0bab5 100644 --- a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs @@ -14,7 +14,7 @@ use fst::map::OpBuilder; use fst::{IntoStreamer, Streamer}; -use regex_automata::DenseDFA; +use regex_automata::dfa::dense::DFA; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ @@ -24,15 +24,13 @@ use crate::inverted_index::search::fst_apply::FstApplier; use crate::inverted_index::search::predicate::{Predicate, Range}; use crate::inverted_index::FstMap; -type Dfa = DenseDFA, usize>; - /// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns. pub struct IntersectionFstApplier { /// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap. ranges: Vec, /// A list of `Dfa` compiled from regular expression patterns. - dfas: Vec, + dfas: Vec>>, } impl FstApplier for IntersectionFstApplier { @@ -88,8 +86,8 @@ impl IntersectionFstApplier { match predicate { Predicate::Range(range) => ranges.push(range.range), Predicate::RegexMatch(regex) => { - let dfa = DenseDFA::new(®ex.pattern); - let dfa = dfa.context(ParseDFASnafu)?; + let dfa = DFA::new(®ex.pattern); + let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?; dfas.push(dfa); } // Rejection of `InList` predicates is enforced here. @@ -210,47 +208,67 @@ mod tests { #[test] fn test_intersection_fst_applier_with_valid_pattern() { - let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap(); - - let applier = create_applier_from_pattern("a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("a.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("d.?").unwrap(); - let results = applier.apply(&test_fst); - assert!(results.is_empty()); - - let applier = create_applier_from_pattern("a.?|b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2]); - - let applier = create_applier_from_pattern("d.?|a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern(".*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2, 3]); + let test_fst = FstMap::from_iter([("123", 1), ("abc", 2)]).unwrap(); + + let cases = vec![ + ("1", vec![1]), + ("2", vec![1]), + ("3", vec![1]), + ("^1", vec![1]), + ("^2", vec![]), + ("^3", vec![]), + ("^1.*", vec![1]), + ("^.*2", vec![1]), + ("^.*3", vec![1]), + ("1$", vec![]), + ("2$", vec![]), + ("3$", vec![1]), + ("1.*$", vec![1]), + ("2.*$", vec![1]), + ("3.*$", vec![1]), + ("^1..$", vec![1]), + ("^.2.$", vec![1]), + ("^..3$", vec![1]), + ("^[0-9]", vec![1]), + ("^[0-9]+$", vec![1]), + ("^[0-9][0-9]$", vec![]), + ("^[0-9][0-9][0-9]$", vec![1]), + ("^123$", vec![1]), + ("a", vec![2]), + ("b", vec![2]), + ("c", vec![2]), + ("^a", vec![2]), + ("^b", vec![]), + ("^c", vec![]), + ("^a.*", vec![2]), + ("^.*b", vec![2]), + ("^.*c", vec![2]), + ("a$", vec![]), + ("b$", vec![]), + ("c$", vec![2]), + ("a.*$", vec![2]), + ("b.*$", vec![2]), + ("c.*$", vec![2]), + ("^.[a-z]", vec![2]), + ("^abc$", vec![2]), + ("^ab$", vec![]), + ("abc$", vec![2]), + ("^a.c$", vec![2]), + ("^..c$", vec![2]), + ("ab", vec![2]), + (".*", vec![1, 2]), + ("", vec![1, 2]), + ("^$", vec![]), + ("1|a", vec![1, 2]), + ("^123$|^abc$", vec![1, 2]), + ("^123$|d", vec![1]), + ]; + + for (pattern, expected) in cases { + let applier = create_applier_from_pattern(pattern).unwrap(); + let results = applier.apply(&test_fst); + assert_eq!(results, expected); + } } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 8c3ef50ec2c7..a28e4f0426ea 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -39,6 +39,7 @@ datafusion.workspace = true datatypes.workspace = true futures.workspace = true humantime-serde.workspace = true +index.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 39457281d76b..68a35123ea39 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -423,6 +423,23 @@ pub enum Error { #[snafu(source)] error: parquet::errors::ParquetError, }, + + #[snafu(display("Column not found, column: {column}"))] + ColumnNotFound { column: String, location: Location }, + + #[snafu(display("Failed to build index applier"))] + BuildIndexApplier { + #[snafu(source)] + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Failed to convert value"))] + ConvertValue { + #[snafu(source)] + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -468,6 +485,7 @@ impl ErrorExt for Error { | InvalidRequest { .. } | FillDefault { .. } | ConvertColumnDataType { .. } + | ColumnNotFound { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } @@ -504,6 +522,8 @@ impl ErrorExt for Error { JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, + BuildIndexApplier { source, .. } => source.status_code(), + ConvertValue { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 4cc6fd3274ac..33ef05433521 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -84,7 +84,11 @@ impl SortField { } impl SortField { - fn serialize(&self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef) -> Result<()> { + pub(crate) fn serialize( + &self, + serializer: &mut Serializer<&mut Vec>, + value: &ValueRef, + ) -> Result<()> { macro_rules! cast_value_and_serialize { ( $self: ident; diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 32c7b4951a55..55939c2d246a 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -16,5 +16,6 @@ pub mod file; pub mod file_purger; +mod index; pub mod parquet; pub(crate) mod version; diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs new file mode 100644 index 000000000000..baffda27aa6e --- /dev/null +++ b/src/mito2/src/sst/index.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +pub mod applier; +mod codec; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs new file mode 100644 index 000000000000..95ca25ba003d --- /dev/null +++ b/src/mito2/src/sst/index/applier.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod builder; + +use index::inverted_index::search::index_apply::IndexApplier; +use object_store::ObjectStore; + +/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files +/// and returning the relevant row group ids for further scan. +pub struct SstIndexApplier { + /// The root directory of the region. + region_dir: String, + + /// Object store responsible for accessing SST files. + object_store: ObjectStore, + + /// Predefined index applier used to apply predicates to index files + /// and return the relevant row group ids for further scan. + index_applier: Box, +} + +impl SstIndexApplier { + /// Creates a new [`SstIndexApplier`]. + pub fn new( + region_dir: String, + object_store: ObjectStore, + index_applier: Box, + ) -> Self { + Self { + region_dir, + object_store, + index_applier, + } + } +} diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs new file mode 100644 index 000000000000..52af22effb18 --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod between; + +// TODO(zhongzc): This PR is too large. The following modules are coming soon. + +// mod comparison; +// mod eq_list; +// mod in_list; +// mod regex_match; + +use std::collections::HashMap; + +use api::v1::SemanticType; +use common_query::logical_plan::Expr; +use common_telemetry::warn; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr as DfExpr; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::inverted_index::search::index_apply::PredicatesIndexApplier; +use index::inverted_index::search::predicate::Predicate; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; + +use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; +use crate::row_converter::SortField; +use crate::sst::index::applier::SstIndexApplier; +use crate::sst::index::codec::IndexValueCodec; + +type ColumnName = String; + +/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. +pub struct SstIndexApplierBuilder<'a> { + /// Directory of the region, required argument for constructing [`SstIndexApplier`]. + region_dir: String, + + /// Object store, required argument for constructing [`SstIndexApplier`]. + object_store: ObjectStore, + + /// Metadata of the region, used to get metadata like column type. + metadata: &'a RegionMetadata, + + /// Stores predicates during traversal on the Expr tree. + output: HashMap>, +} + +impl<'a> SstIndexApplierBuilder<'a> { + /// Creates a new [`SstIndexApplierBuilder`]. + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + output: HashMap::default(), + } + } + + /// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on + /// the expressions provided. If no predicates match, returns `None`. + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr.df_expr()); + } + + if self.output.is_empty() { + return Ok(None); + } + + let predicates = self.output.into_iter().collect(); + let applier = PredicatesIndexApplier::try_from(predicates); + Ok(Some(SstIndexApplier::new( + self.region_dir, + self.object_store, + Box::new(applier.context(BuildIndexApplierSnafu)?), + ))) + } + + /// Recursively traverses expressions to collect predicates. + /// Results are stored in `self.output`. + fn traverse_and_collect(&mut self, expr: &DfExpr) { + let res = match expr { + DfExpr::Between(between) => self.collect_between(between), + + // TODO(zhongzc): This PR is too large. The following arms are coming soon. + + // DfExpr::InList(in_list) => self.collect_inlist(in_list), + // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + // Operator::And => { + // self.traverse_and_collect(left); + // self.traverse_and_collect(right); + // Ok(()) + // } + // Operator::Or => self.collect_or_eq_list(left, right), + // Operator::Eq => self.collect_eq(left, right), + // Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + // self.collect_comparison_expr(left, op, right) + // } + // Operator::RegexMatch => self.collect_regex_match(left, right), + // _ => Ok(()), + // }, + + // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ... + _ => Ok(()), + }; + + if let Err(err) = res { + warn!(err; "Failed to collect predicates, ignore it. expr: {expr}"); + } + } + + /// Helper function to add a predicate to the output. + fn add_predicate(&mut self, column_name: &str, predicate: Predicate) { + match self.output.get_mut(column_name) { + Some(predicates) => predicates.push(predicate), + None => { + self.output.insert(column_name.to_string(), vec![predicate]); + } + } + } + + /// Helper function to get the column type of a tag column. + /// Returns `None` if the column is not a tag column. + fn tag_column_type(&self, column_name: &str) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok((column.semantic_type == SemanticType::Tag) + .then(|| column.column_schema.data_type.clone())) + } + + /// Helper function to get a non-null literal. + fn nonnull_lit(expr: &DfExpr) -> Option<&ScalarValue> { + match expr { + DfExpr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } + + /// Helper function to get the column name of a column expression. + fn column_name(expr: &DfExpr) -> Option<&str> { + match expr { + DfExpr::Column(column) => Some(&column.name), + _ => None, + } + } + + /// Helper function to encode a literal into bytes. + fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) + } +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use object_store::ObjectStore; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + pub(crate) fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + pub(crate) fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + pub(crate) fn tag_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "a".to_string(), + }) + } + + pub(crate) fn field_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "b".to_string(), + }) + } + + pub(crate) fn nonexistent_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "nonexistent".to_string(), + }) + } + + pub(crate) fn string_lit(s: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + pub(crate) fn int64_lit(i: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Int64(Some(i.into()))) + } + + pub(crate) fn encoded_string(s: impl Into) -> Vec { + let mut bytes = vec![]; + IndexValueCodec::encode_value( + Value::from(s.into()).as_value_ref(), + &SortField::new(ConcreteDataType::string_datatype()), + &mut bytes, + ) + .unwrap(); + bytes + } +} diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs new file mode 100644 index 000000000000..50ae7073b2db --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_expr::Between; +use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`. + pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> { + if between.negated { + return Ok(()); + } + + let Some(column_name) = Self::column_name(&between.expr) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + let Some(low) = Self::nonnull_lit(&between.low) else { + return Ok(()); + }; + let Some(high) = Self::nonnull_lit(&between.high) else { + return Ok(()); + }; + + let predicate = Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: Self::encode_lit(low, data_type.clone())?, + }), + upper: Some(Bound { + inclusive: true, + value: Self::encode_lit(high, data_type)?, + }), + }, + }); + + self.add_predicate(column_name, predicate); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_between_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: encoded_string("abc"), + }), + upper: Some(Bound { + inclusive: true, + value: encoded_string("def"), + }), + } + }) + ); + } + + #[test] + fn test_collect_between_negated() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: true, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(field_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(int64_lit(123)), + high: Box::new(int64_lit(456)), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(nonexistent_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs new file mode 100644 index 000000000000..ada5ac07cbfc --- /dev/null +++ b/src/mito2/src/sst/index/codec.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::value::ValueRef; +use memcomparable::Serializer; + +use crate::error::Result; +use crate::row_converter::SortField; + +/// Encodes index values according to their data types for sorting and storage use. +pub struct IndexValueCodec; + +impl IndexValueCodec { + /// Serializes a `ValueRef` using the data type defined in `SortField` and writes + /// the result into a buffer. + /// + /// # Arguments + /// * `value` - The value to be encoded. + /// * `field` - Contains data type to guide serialization. + /// * `buffer` - Destination buffer for the serialized value. + pub fn encode_value(value: ValueRef, field: &SortField, buffer: &mut Vec) -> Result<()> { + buffer.reserve(field.estimated_size()); + let mut serializer = Serializer::new(buffer); + field.serialize(&mut serializer, &value) + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + + use super::*; + use crate::error::Error; + + #[test] + fn test_encode_value_basic() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::string_datatype()); + + let mut buffer = Vec::new(); + IndexValueCodec::encode_value(value, &field, &mut buffer).unwrap(); + assert!(!buffer.is_empty()); + } + + #[test] + fn test_encode_value_type_mismatch() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::int64_datatype()); + + let mut buffer = Vec::new(); + let res = IndexValueCodec::encode_value(value, &field, &mut buffer); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + } +}