Skip to content

Commit

Permalink
feat(inverted_index): Add applier builder to convert Expr to Predicat…
Browse files Browse the repository at this point in the history
…es (Part 1) (#3034)

* feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1)

Signed-off-by: Zhenchi <[email protected]>

* chore: add docs

Signed-off-by: Zhenchi <[email protected]>

* fix: typos

Signed-off-by: Zhenchi <[email protected]>

* fix: address comments

Signed-off-by: Zhenchi <[email protected]>

* Update src/mito2/src/sst/index/applier/builder.rs

Co-authored-by: Yingwen <[email protected]>

* fix: remove unwrap

Signed-off-by: Zhenchi <[email protected]>

* chore: error source

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
zhongzc and evenyag authored Dec 30, 2023
1 parent 1c94d4c commit 69a5313
Show file tree
Hide file tree
Showing 14 changed files with 670 additions and 52 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.1", features = ["transducer"] }
regex-automata = { version = "0.2", features = ["transducer"] }
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
Expand Down Expand Up @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
Expand Down
2 changes: 1 addition & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size a kakfa batch producer could buffer.
/// The maximum log size a kafka batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub enum Error {
#[snafu(display("Failed to parse regex DFA"))]
ParseDFA {
#[snafu(source)]
error: regex_automata::Error,
error: Box<regex_automata::dfa::Error>,
location: Location,
},

Expand Down
112 changes: 65 additions & 47 deletions src/index/src/inverted_index/search/fst_apply/intersection_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use fst::map::OpBuilder;
use fst::{IntoStreamer, Streamer};
use regex_automata::DenseDFA;
use regex_automata::dfa::dense::DFA;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
Expand All @@ -24,15 +24,13 @@ use crate::inverted_index::search::fst_apply::FstApplier;
use crate::inverted_index::search::predicate::{Predicate, Range};
use crate::inverted_index::FstMap;

type Dfa = DenseDFA<Vec<usize>, usize>;

/// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns.
pub struct IntersectionFstApplier {
/// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap.
ranges: Vec<Range>,

/// A list of `Dfa` compiled from regular expression patterns.
dfas: Vec<Dfa>,
dfas: Vec<DFA<Vec<u32>>>,
}

impl FstApplier for IntersectionFstApplier {
Expand Down Expand Up @@ -88,8 +86,8 @@ impl IntersectionFstApplier {
match predicate {
Predicate::Range(range) => ranges.push(range.range),
Predicate::RegexMatch(regex) => {
let dfa = DenseDFA::new(&regex.pattern);
let dfa = dfa.context(ParseDFASnafu)?;
let dfa = DFA::new(&regex.pattern);
let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?;
dfas.push(dfa);
}
// Rejection of `InList` predicates is enforced here.
Expand Down Expand Up @@ -210,47 +208,67 @@ mod tests {

#[test]
fn test_intersection_fst_applier_with_valid_pattern() {
let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap();

let applier = create_applier_from_pattern("a.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern("b.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![2]);

let applier = create_applier_from_pattern("c.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![3]);

let applier = create_applier_from_pattern("a.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern("b.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![2]);

let applier = create_applier_from_pattern("c.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![3]);

let applier = create_applier_from_pattern("d.?").unwrap();
let results = applier.apply(&test_fst);
assert!(results.is_empty());

let applier = create_applier_from_pattern("a.?|b.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1, 2]);

let applier = create_applier_from_pattern("d.?|a.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern(".*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1, 2, 3]);
let test_fst = FstMap::from_iter([("123", 1), ("abc", 2)]).unwrap();

let cases = vec![
("1", vec![1]),
("2", vec![1]),
("3", vec![1]),
("^1", vec![1]),
("^2", vec![]),
("^3", vec![]),
("^1.*", vec![1]),
("^.*2", vec![1]),
("^.*3", vec![1]),
("1$", vec![]),
("2$", vec![]),
("3$", vec![1]),
("1.*$", vec![1]),
("2.*$", vec![1]),
("3.*$", vec![1]),
("^1..$", vec![1]),
("^.2.$", vec![1]),
("^..3$", vec![1]),
("^[0-9]", vec![1]),
("^[0-9]+$", vec![1]),
("^[0-9][0-9]$", vec![]),
("^[0-9][0-9][0-9]$", vec![1]),
("^123$", vec![1]),
("a", vec![2]),
("b", vec![2]),
("c", vec![2]),
("^a", vec![2]),
("^b", vec![]),
("^c", vec![]),
("^a.*", vec![2]),
("^.*b", vec![2]),
("^.*c", vec![2]),
("a$", vec![]),
("b$", vec![]),
("c$", vec![2]),
("a.*$", vec![2]),
("b.*$", vec![2]),
("c.*$", vec![2]),
("^.[a-z]", vec![2]),
("^abc$", vec![2]),
("^ab$", vec![]),
("abc$", vec![2]),
("^a.c$", vec![2]),
("^..c$", vec![2]),
("ab", vec![2]),
(".*", vec![1, 2]),
("", vec![1, 2]),
("^$", vec![]),
("1|a", vec![1, 2]),
("^123$|^abc$", vec![1, 2]),
("^123$|d", vec![1]),
];

for (pattern, expected) in cases {
let applier = create_applier_from_pattern(pattern).unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, expected);
}
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion.workspace = true
datatypes.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
Expand Down
20 changes: 20 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,23 @@ pub enum Error {
#[snafu(source)]
error: parquet::errors::ParquetError,
},

#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound { column: String, location: Location },

#[snafu(display("Failed to build index applier"))]
BuildIndexApplier {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -468,6 +485,7 @@ impl ErrorExt for Error {
| InvalidRequest { .. }
| FillDefault { .. }
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }
| InvalidMetadata { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. }
| Join { .. }
Expand Down Expand Up @@ -504,6 +522,8 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
BuildIndexApplier { source, .. } => source.status_code(),
ConvertValue { source, .. } => source.status_code(),
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/row_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl SortField {
}

impl SortField {
fn serialize(&self, serializer: &mut Serializer<&mut Vec<u8>>, value: &ValueRef) -> Result<()> {
pub(crate) fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
pub mod file;
pub mod file_purger;
mod index;
pub mod parquet;
pub(crate) mod version;
18 changes: 18 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(dead_code)]

pub mod applier;
mod codec;
47 changes: 47 additions & 0 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod builder;

use index::inverted_index::search::index_apply::IndexApplier;
use object_store::ObjectStore;

/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub struct SstIndexApplier {
/// The root directory of the region.
region_dir: String,

/// Object store responsible for accessing SST files.
object_store: ObjectStore,

/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
}

impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
pub fn new(
region_dir: String,
object_store: ObjectStore,
index_applier: Box<dyn IndexApplier>,
) -> Self {
Self {
region_dir,
object_store,
index_applier,
}
}
}
Loading

0 comments on commit 69a5313

Please sign in to comment.