Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 1) #3034

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.1", features = ["transducer"] }
regex-automata = { version = "0.2", features = ["transducer"] }
evenyag marked this conversation as resolved.
Show resolved Hide resolved
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
Expand Down Expand Up @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
Expand Down
2 changes: 1 addition & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size a kakfa batch producer could buffer.
/// The maximum log size a kafka batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub enum Error {
#[snafu(display("Failed to parse regex DFA"))]
ParseDFA {
#[snafu(source)]
error: regex_automata::Error,
error: Box<regex_automata::dfa::Error>,
location: Location,
},

Expand Down
112 changes: 65 additions & 47 deletions src/index/src/inverted_index/search/fst_apply/intersection_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use fst::map::OpBuilder;
use fst::{IntoStreamer, Streamer};
use regex_automata::DenseDFA;
use regex_automata::dfa::dense::DFA;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
Expand All @@ -24,15 +24,13 @@ use crate::inverted_index::search::fst_apply::FstApplier;
use crate::inverted_index::search::predicate::{Predicate, Range};
use crate::inverted_index::FstMap;

type Dfa = DenseDFA<Vec<usize>, usize>;

/// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns.
pub struct IntersectionFstApplier {
/// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap.
ranges: Vec<Range>,

/// A list of `Dfa` compiled from regular expression patterns.
dfas: Vec<Dfa>,
dfas: Vec<DFA<Vec<u32>>>,
}

impl FstApplier for IntersectionFstApplier {
Expand Down Expand Up @@ -88,8 +86,8 @@ impl IntersectionFstApplier {
match predicate {
Predicate::Range(range) => ranges.push(range.range),
Predicate::RegexMatch(regex) => {
let dfa = DenseDFA::new(&regex.pattern);
let dfa = dfa.context(ParseDFASnafu)?;
let dfa = DFA::new(&regex.pattern);
let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?;
dfas.push(dfa);
}
// Rejection of `InList` predicates is enforced here.
Expand Down Expand Up @@ -210,47 +208,67 @@ mod tests {

#[test]
fn test_intersection_fst_applier_with_valid_pattern() {
let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap();

let applier = create_applier_from_pattern("a.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern("b.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![2]);

let applier = create_applier_from_pattern("c.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![3]);

let applier = create_applier_from_pattern("a.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern("b.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![2]);

let applier = create_applier_from_pattern("c.*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![3]);

let applier = create_applier_from_pattern("d.?").unwrap();
let results = applier.apply(&test_fst);
assert!(results.is_empty());

let applier = create_applier_from_pattern("a.?|b.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1, 2]);

let applier = create_applier_from_pattern("d.?|a.?").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1]);

let applier = create_applier_from_pattern(".*").unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, vec![1, 2, 3]);
let test_fst = FstMap::from_iter([("123", 1), ("abc", 2)]).unwrap();

let cases = vec![
("1", vec![1]),
("2", vec![1]),
("3", vec![1]),
("^1", vec![1]),
("^2", vec![]),
("^3", vec![]),
("^1.*", vec![1]),
("^.*2", vec![1]),
("^.*3", vec![1]),
("1$", vec![]),
("2$", vec![]),
("3$", vec![1]),
("1.*$", vec![1]),
("2.*$", vec![1]),
("3.*$", vec![1]),
("^1..$", vec![1]),
("^.2.$", vec![1]),
("^..3$", vec![1]),
("^[0-9]", vec![1]),
("^[0-9]+$", vec![1]),
("^[0-9][0-9]$", vec![]),
("^[0-9][0-9][0-9]$", vec![1]),
("^123$", vec![1]),
("a", vec![2]),
("b", vec![2]),
("c", vec![2]),
("^a", vec![2]),
("^b", vec![]),
("^c", vec![]),
("^a.*", vec![2]),
("^.*b", vec![2]),
("^.*c", vec![2]),
("a$", vec![]),
("b$", vec![]),
("c$", vec![2]),
("a.*$", vec![2]),
("b.*$", vec![2]),
("c.*$", vec![2]),
("^.[a-z]", vec![2]),
("^abc$", vec![2]),
("^ab$", vec![]),
("abc$", vec![2]),
("^a.c$", vec![2]),
("^..c$", vec![2]),
("ab", vec![2]),
(".*", vec![1, 2]),
("", vec![1, 2]),
("^$", vec![]),
("1|a", vec![1, 2]),
("^123$|^abc$", vec![1, 2]),
("^123$|d", vec![1]),
];

for (pattern, expected) in cases {
let applier = create_applier_from_pattern(pattern).unwrap();
let results = applier.apply(&test_fst);
assert_eq!(results, expected);
}
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion.workspace = true
datatypes.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
Expand Down
20 changes: 20 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,23 @@ pub enum Error {
#[snafu(source)]
error: parquet::errors::ParquetError,
},

#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound { column: String, location: Location },

#[snafu(display("Failed to build index applier"))]
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
BuildIndexApplier {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -468,6 +485,7 @@ impl ErrorExt for Error {
| InvalidRequest { .. }
| FillDefault { .. }
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }
| InvalidMetadata { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. }
| Join { .. }
Expand Down Expand Up @@ -504,6 +522,8 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
BuildIndexApplier { source, .. } => source.status_code(),
ConvertValue { source, .. } => source.status_code(),
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/row_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl SortField {
}

impl SortField {
fn serialize(&self, serializer: &mut Serializer<&mut Vec<u8>>, value: &ValueRef) -> Result<()> {
pub(crate) fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@

pub mod file;
pub mod file_purger;
mod index;
pub mod parquet;
pub(crate) mod version;
18 changes: 18 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(dead_code)]

pub mod applier;
mod codec;
47 changes: 47 additions & 0 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod builder;

use index::inverted_index::search::index_apply::IndexApplier;
use object_store::ObjectStore;

/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub struct SstIndexApplier {
/// The root directory of the region.
region_dir: String,

/// Object store responsible for accessing SST files.
object_store: ObjectStore,

/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
}

impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
pub fn new(
region_dir: String,
object_store: ObjectStore,
index_applier: Box<dyn IndexApplier>,
) -> Self {
Self {
region_dir,
object_store,
index_applier,
}
}
}
Loading
Loading