From b58296de22ec774c48e420d86d5d2bb80c82e06f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 28 Dec 2023 14:56:17 +0800 Subject: [PATCH 1/7] feat: Implement OR for PromQL (#3024) * with anit-join Signed-off-by: Ruihang Xia * impl UnionDistinctOn Signed-off-by: Ruihang Xia * unify schema Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * add sqlness case Signed-off-by: Ruihang Xia * add UTs Signed-off-by: Ruihang Xia * Update src/promql/src/planner.rs Co-authored-by: dennis zhuang --------- Signed-off-by: Ruihang Xia Co-authored-by: dennis zhuang --- Cargo.lock | 1 + src/promql/Cargo.toml | 1 + src/promql/src/extension_plan.rs | 4 + .../src/extension_plan/instant_manipulate.rs | 50 +- src/promql/src/extension_plan/planner.rs | 7 +- src/promql/src/extension_plan/test_util.rs | 64 ++ .../src/extension_plan/union_distinct_on.rs | 576 ++++++++++++++++++ src/promql/src/lib.rs | 1 + src/promql/src/planner.rs | 127 +++- .../common/promql/set_operation.result | 148 ++++- .../common/promql/set_operation.sql | 36 +- 11 files changed, 951 insertions(+), 64 deletions(-) create mode 100644 src/promql/src/extension_plan/test_util.rs create mode 100644 src/promql/src/extension_plan/union_distinct_on.rs diff --git a/Cargo.lock b/Cargo.lock index 97e7f48e922e..abe0acb61213 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6541,6 +6541,7 @@ dependencies = [ name = "promql" version = "0.5.0" dependencies = [ + "ahash 0.8.6", "async-recursion", "async-trait", "bytemuck", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index a10973d4ebc1..6be12de4e343 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +ahash.workspace = true async-recursion = "1.0" async-trait.workspace = true bytemuck.workspace = true diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 49a9199bf0cc..ff2195e532ee 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -19,6 +19,9 @@ mod normalize; mod planner; mod range_manipulate; mod series_divide; +#[cfg(test)] +mod test_util; +mod union_distinct_on; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; @@ -28,5 +31,6 @@ pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream} pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; +pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index ba155627d2c5..e65592bb374e 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -445,40 +445,12 @@ impl InstantManipulateStream { #[cfg(test)] mod test { - use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, - }; - use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datatypes::arrow::array::TimestampMillisecondArray; - use datatypes::arrow_array::StringArray; use super::*; - - const TIME_INDEX_COLUMN: &str = "timestamp"; - - fn prepare_test_data() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - Field::new("path", DataType::Utf8, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - 180_000, 240_000, // every 60s - 241_000, 271_000, 291_000, // others - ])) as _; - let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; - let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; - let data = RecordBatch::try_new( - schema.clone(), - vec![timestamp_column, field_column, path_column], - ) - .unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } + use crate::extension_plan::test_util::{ + prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN, + }; async fn do_normalize_test( start: Millisecond, @@ -749,22 +721,6 @@ mod test { do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await; } - fn prepare_test_data_with_nan() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - ])) as _; - let field_column = - Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; - let data = - RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } - #[tokio::test] async fn lookback_10s_interval_10s_with_nan() { let expected = String::from( diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 7798c9b32193..80cd565bd20a 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use super::HistogramFold; +use super::{HistogramFold, UnionDistinctOn}; use crate::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -50,6 +50,11 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(session_state, planner)?)) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan( + physical_inputs[0].clone(), + physical_inputs[1].clone(), + ))) } else { Ok(None) } diff --git a/src/promql/src/extension_plan/test_util.rs b/src/promql/src/extension_plan/test_util.rs new file mode 100644 index 000000000000..f751cb9fa84b --- /dev/null +++ b/src/promql/src/extension_plan/test_util.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utils for testing extension plan + +use std::sync::Arc; + +use common_recordbatch::DfRecordBatch as RecordBatch; +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, +}; +use datafusion::physical_plan::memory::MemoryExec; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow_array::StringArray; + +pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp"; + +pub(crate) fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; + let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, field_column, path_column], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} + +pub(crate) fn prepare_test_data_with_nan() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; + let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs new file mode 100644 index 000000000000..22551b73f810 --- /dev/null +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -0,0 +1,576 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use ahash::{HashMap, RandomState}; +use datafusion::arrow::array::UInt64Array; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::DFSchemaRef; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + hash_utils, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use datatypes::arrow::compute; +use futures::future::BoxFuture; +use futures::{ready, Stream, StreamExt, TryStreamExt}; + +/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case. +/// +/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The +/// most different part is that it treat left child and right child differently: +/// - All columns from left child will be outputted. +/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`. +/// - When there is a collision: +/// - If the collision is from right child itself, only the first observed row will be +/// preserved. All others are discarded. +/// - If the collision is from left child, the row in right child will be discarded. +/// - The output order is not maintained. This plan will output left child first, then right child. +/// - The output schema contains all columns from left or right child plans. +/// +/// From the implementation perspective, this operator is similar to `HashJoin`, but the +/// probe side is the right child, and the build side is the left child. Another difference +/// is that the probe is opting-out. +/// +/// This plan will exhaust the right child first to build probe hash table, then streaming +/// on left side, and use the left side to "mask" the hash table. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct UnionDistinctOn { + left: LogicalPlan, + right: LogicalPlan, + /// The columns to compare for equality. + /// TIME INDEX is included. + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, +} + +impl UnionDistinctOn { + pub fn name() -> &'static str { + "UnionDistinctOn" + } + + pub fn new( + left: LogicalPlan, + right: LogicalPlan, + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, + ) -> Self { + Self { + left, + right, + compare_keys, + ts_col, + output_schema, + } + } + + pub fn to_execution_plan( + &self, + left_exec: Arc, + right_exec: Arc, + ) -> Arc { + Arc::new(UnionDistinctOnExec { + left: left_exec, + right: right_exec, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: Arc::new(self.output_schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + random_state: RandomState::new(), + }) + } +} + +impl UserDefinedLogicalNodeCore for UnionDistinctOn { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.left, &self.right] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UnionDistinctOn: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + assert_eq!(inputs.len(), 2); + + let left = inputs[0].clone(); + let right = inputs[1].clone(); + Self { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + } + } +} + +#[derive(Debug)] +pub struct UnionDistinctOnExec { + left: Arc, + right: Arc, + compare_keys: Vec, + ts_col: String, + output_schema: SchemaRef, + metric: ExecutionPlanMetricsSet, + + /// Shared the `RandomState` for the hashing algorithm + random_state: RandomState, +} + +impl ExecutionPlan for UnionDistinctOnExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition, Distribution::SinglePartition] + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + /// [UnionDistinctOnExec] will output left first, then right. + /// So the order of the output is not maintained. + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert_eq!(children.len(), 2); + + let left = children[0].clone(); + let right = children[1].clone(); + Ok(Arc::new(UnionDistinctOnExec { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + metric: self.metric.clone(), + random_state: self.random_state.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let left_stream = self.left.execute(partition, context.clone())?; + let right_stream = self.right.execute(partition, context.clone())?; + + // Convert column name to column index. Add one for the time column. + let mut key_indices = Vec::with_capacity(self.compare_keys.len() + 1); + for key in &self.compare_keys { + let index = self + .output_schema + .column_with_name(key) + .map(|(i, _)| i) + .ok_or_else(|| DataFusionError::Internal(format!("Column {} not found", key)))?; + key_indices.push(index); + } + let ts_index = self + .output_schema + .column_with_name(&self.ts_col) + .map(|(i, _)| i) + .ok_or_else(|| { + DataFusionError::Internal(format!("Column {} not found", self.ts_col)) + })?; + key_indices.push(ts_index); + + // Build right hash table future. + let hashed_data_future = HashedDataFut::Pending(Box::pin(HashedData::new( + right_stream, + self.random_state.clone(), + key_indices.clone(), + ))); + + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(UnionDistinctOnStream { + left: left_stream, + right: hashed_data_future, + compare_keys: key_indices, + output_schema: self.output_schema.clone(), + metric: baseline_metric, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +impl DisplayAs for UnionDistinctOnExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnionDistinctOnExec: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + } + } +} + +// TODO(ruihang): some unused fields are for metrics, which will be implemented later. +#[allow(dead_code)] +pub struct UnionDistinctOnStream { + left: SendableRecordBatchStream, + right: HashedDataFut, + /// Include time index + compare_keys: Vec, + output_schema: SchemaRef, + metric: BaselineMetrics, +} + +impl UnionDistinctOnStream { + fn poll_impl(&mut self, cx: &mut Context<'_>) -> Poll::Item>> { + // resolve the right stream + let right = match self.right { + HashedDataFut::Pending(ref mut fut) => { + let right = ready!(fut.as_mut().poll(cx))?; + self.right = HashedDataFut::Ready(right); + let HashedDataFut::Ready(right_ref) = &mut self.right else { + unreachable!() + }; + right_ref + } + HashedDataFut::Ready(ref mut right) => right, + HashedDataFut::Empty => return Poll::Ready(None), + }; + + // poll left and probe with right + let next_left = ready!(self.left.poll_next_unpin(cx)); + match next_left { + Some(Ok(left)) => { + // observe left batch and return it + right.update_map(&left)?; + Poll::Ready(Some(Ok(left))) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => { + // left stream is exhausted, so we can send the right part + let right = std::mem::replace(&mut self.right, HashedDataFut::Empty); + let HashedDataFut::Ready(data) = right else { + unreachable!() + }; + Poll::Ready(Some(data.finish())) + } + } + } +} + +impl RecordBatchStream for UnionDistinctOnStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for UnionDistinctOnStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_impl(cx) + } +} + +/// Simple future state for [HashedData] +enum HashedDataFut { + /// The result is not ready + Pending(BoxFuture<'static, DataFusionResult>), + /// The result is ready + Ready(HashedData), + /// The result is taken + Empty, +} + +/// ALL input batches and its hash table +struct HashedData { + // TODO(ruihang): use `JoinHashMap` instead after upgrading to DF 34.0 + /// Hash table for all input batches. The key is hash value, and the value + /// is the index of `bathc`. + hash_map: HashMap, + /// Output batch. + batch: RecordBatch, + /// The indices of the columns to be hashed. + hash_key_indices: Vec, + random_state: RandomState, +} + +impl HashedData { + pub async fn new( + input: SendableRecordBatchStream, + random_state: RandomState, + hash_key_indices: Vec, + ) -> DataFusionResult { + // Collect all batches from the input stream + let initial = (Vec::new(), 0); + let (batches, _num_rows) = input + .try_fold(initial, |mut acc, batch| async { + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }) + .await?; + + // Create hash for each batch + let mut hash_map = HashMap::default(); + let mut hashes_buffer = Vec::new(); + let mut interleave_indices = Vec::new(); + for (batch_number, batch) in batches.iter().enumerate() { + hashes_buffer.resize(batch.num_rows(), 0); + // get columns for hashing + let arrays = hash_key_indices + .iter() + .map(|i| batch.column(*i).clone()) + .collect::>(); + + // compute hash + let hash_values = + hash_utils::create_hashes(&arrays, &random_state, &mut hashes_buffer)?; + for (row_number, hash_value) in hash_values.iter().enumerate() { + // Only keeps the first observed row for each hash value + if hash_map + .try_insert(*hash_value, interleave_indices.len()) + .is_ok() + { + interleave_indices.push((batch_number, row_number)); + } + } + } + + // Finilize the hash map + let batch = interleave_batches(batches, interleave_indices)?; + + Ok(Self { + hash_map, + batch, + hash_key_indices, + random_state, + }) + } + + /// Remove rows that hash value present in the input + /// record batch from the hash map. + pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> { + // get columns for hashing + let mut hashes_buffer = Vec::new(); + let arrays = self + .hash_key_indices + .iter() + .map(|i| input.column(*i).clone()) + .collect::>(); + + // compute hash + hashes_buffer.resize(input.num_rows(), 0); + let hash_values = + hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + + // remove those hashes + for hash in hash_values { + self.hash_map.remove(hash); + } + + Ok(()) + } + + pub fn finish(self) -> DataFusionResult { + let valid_indices = self.hash_map.values().copied().collect::>(); + let result = take_batch(&self.batch, &valid_indices)?; + Ok(result) + } +} + +/// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave) +fn interleave_batches( + batches: Vec, + indices: Vec<(usize, usize)>, +) -> DataFusionResult { + let schema = batches[0].schema(); + + // transform batches into arrays + let mut arrays = vec![vec![]; schema.fields().len()]; + for batch in &batches { + for (i, array) in batch.columns().iter().enumerate() { + arrays[i].push(array.as_ref()); + } + } + + // interleave arrays + let mut interleaved_arrays = Vec::with_capacity(arrays.len()); + for array in arrays { + interleaved_arrays.push(compute::interleave(&array, &indices)?); + } + + // assemble new record batch + RecordBatch::try_new(schema.clone(), interleaved_arrays).map_err(DataFusionError::ArrowError) +} + +/// Utility function to take rows from a record batch. Based on [take](datafusion::arrow::compute::take) +fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult { + // fast path + if batch.num_rows() == indices.len() { + return Ok(batch.clone()); + } + + let schema = batch.schema(); + + let indices_array = UInt64Array::from_iter(indices.iter().map(|i| *i as u64)); + let arrays = batch + .columns() + .iter() + .map(|array| compute::take(array, &indices_array, None)) + .collect::, _>>() + .map_err(DataFusionError::ArrowError)?; + + let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?; + Ok(result) +} + +#[cfg(test)] +mod test { + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + + use super::*; + + #[test] + fn test_interleave_batches() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + let batch1 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + ) + .unwrap(); + + let batch2 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(Int32Array::from(vec![10, 11, 12])), + ], + ) + .unwrap(); + + let batch3 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![13, 14, 15])), + Arc::new(Int32Array::from(vec![16, 17, 18])), + ], + ) + .unwrap(); + + let batches = vec![batch1, batch2, batch3]; + let indices = vec![(0, 0), (1, 0), (2, 0), (0, 1), (1, 1), (2, 1)]; + let result = interleave_batches(batches, indices).unwrap(); + + let expected = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 7, 13, 2, 8, 14])), + Arc::new(Int32Array::from(vec![4, 10, 16, 5, 11, 17])), + ], + ) + .unwrap(); + + assert_eq!(result, expected); + } + + #[test] + fn test_take_batch() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + ) + .unwrap(); + + let indices = vec![0, 2]; + let result = take_batch(&batch, &indices).unwrap(); + + let expected = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 3])), + Arc::new(Int32Array::from(vec![4, 6])), + ], + ) + .unwrap(); + + assert_eq!(result, expected); + } +} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 9514a015380b..127bf45d5f1a 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -14,6 +14,7 @@ #![feature(option_get_or_insert_default)] #![feature(let_chains)] +#![feature(map_try_insert)] pub mod error; pub mod extension_plan; diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 137035755bd5..7c8176d7b95e 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -51,7 +51,7 @@ use crate::error::{ }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, - RangeManipulate, SeriesDivide, SeriesNormalize, + RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -1489,6 +1489,7 @@ impl PromPlanner { .context(DataFusionPlanningSnafu) } + /// Build a set operator (AND/OR/UNLESS) fn set_op_on_non_field_columns( &self, left: LogicalPlan, @@ -1501,6 +1502,10 @@ impl PromPlanner { let mut left_tag_col_set = left_tag_cols.into_iter().collect::>(); let mut right_tag_col_set = right_tag_cols.into_iter().collect::>(); + if matches!(op.id(), token::T_LOR) { + return self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier); + } + // apply modifier if let Some(modifier) = modifier { // one-to-many and many-to-one are not supported @@ -1545,7 +1550,8 @@ impl PromPlanner { ) }; let join_keys = left_tag_col_set - .into_iter() + .iter() + .cloned() .chain([self.ctx.time_index_column.clone().unwrap()]) .collect::>(); @@ -1579,17 +1585,122 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu), token::T_LOR => { - // `OR` can not be expressed by `UNION` precisely. - // it will generate unexpceted result when schemas don't match - UnsupportedExprSnafu { - name: "set operation `OR`", - } - .fail() + self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier) } _ => UnexpectedTokenSnafu { token: op }.fail(), } } + // TODO(ruihang): change function name + fn or_operator( + &self, + left: LogicalPlan, + right: LogicalPlan, + left_tag_cols_set: HashSet, + right_tag_cols_set: HashSet, + modifier: &Option, + ) -> Result { + // prepare hash sets + let all_tags = left_tag_cols_set + .union(&right_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_left = all_tags + .difference(&left_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_right = all_tags + .difference(&right_tag_cols_set) + .cloned() + .collect::>(); + let left_qualifier = left.schema().field(0).qualifier().cloned(); + let right_qualifier = right.schema().field(0).qualifier().cloned(); + let left_qualifier_string = left_qualifier + .as_ref() + .map(|l| l.to_string()) + .unwrap_or_default(); + let right_qualifier_string = right_qualifier + .as_ref() + .map(|r| r.to_string()) + .unwrap_or_default(); + + // step 0: fill all columns in output schema + let all_columns_set = left + .schema() + .fields() + .iter() + .chain(right.schema().fields().iter()) + .map(|field| field.name().clone()) + .collect::>(); + let mut all_columns = all_columns_set.into_iter().collect::>(); + // sort to ensure the generated schema is not volatile + all_columns.sort_unstable(); + + // step 1: align schema using project, fill non-exist columns with null + let left_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_left.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) + } else { + DfExpr::Column(Column::new(left_qualifier.clone(), col)) + } + }); + let right_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_right.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) + } else { + DfExpr::Column(Column::new(right_qualifier.clone(), col)) + } + }); + + let left_projected = LogicalPlanBuilder::from(left) + .project(left_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(left_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + let right_projected = LogicalPlanBuilder::from(right) + .project(right_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(right_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + // step 2: compute match columns + let mut match_columns = if let Some(modifier) = modifier + && let Some(matching) = &modifier.matching + { + match matching { + // keeps columns mentioned in `on` + LabelModifier::Include(on) => on.labels.clone(), + // removes columns memtioned in `ignoring` + LabelModifier::Exclude(ignoring) => { + let ignoring = ignoring.labels.iter().cloned().collect::>(); + all_tags.difference(&ignoring).cloned().collect() + } + } + } else { + all_tags.iter().cloned().collect() + }; + // sort to ensure the generated plan is not volatile + match_columns.sort_unstable(); + // step 3: build `UnionDistinctOn` plan + let schema = left_projected.schema().clone(); + let union_distinct_on = UnionDistinctOn::new( + left_projected, + right_projected, + match_columns, + self.ctx.time_index_column.clone().unwrap(), + schema, + ); + let result = LogicalPlan::Extension(Extension { + node: Arc::new(union_distinct_on), + }); + + Ok(result) + } + /// Build a projection that project and perform operation expr for every value columns. /// Non-value columns (tag and timestamp) will be preserved in the projection. /// diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index d14b6fe88bc1..15a7a865a317 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -130,10 +130,21 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` ++------------+----------+-----+---------------------+-------+ +| g | instance | job | ts | val | ++------------+----------+-----+---------------------+-------+ +| canary | 0 | api | 1970-01-01T00:50:00 | 300.0 | +| canary | 0 | app | 1970-01-01T00:50:00 | 700.0 | +| canary | 1 | api | 1970-01-01T00:50:00 | 400.0 | +| canary | 1 | app | 1970-01-01T00:50:00 | 800.0 | +| production | 0 | api | 1970-01-01T00:50:00 | 100.0 | +| production | 0 | app | 1970-01-01T00:50:00 | 500.0 | +| production | 1 | api | 1970-01-01T00:50:00 | 200.0 | +| production | 1 | app | 1970-01-01T00:50:00 | 600.0 | ++------------+----------+-----+---------------------+-------+ -- # On overlap the rhs samples must be dropped. -- eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"} @@ -143,10 +154,10 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.val. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "val + Float64(1)". -- # Matching only on instance excludes everything that has instance=0/1 but includes -- # entries without the instance label. @@ -161,7 +172,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) -- {group="canary", instance="0", job="api-server"} 301 @@ -174,7 +185,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} -- http_requests{group="canary", instance="1", job="api-server"} 400 @@ -268,3 +279,128 @@ drop table vector_matching_a; Affected Rows: 0 +-- the following cases are not from Prometheus. +create table t1 (ts timestamp time index, job string primary key, val double); + +Affected Rows: 0 + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +Affected Rows: 4 + +create table t2 (ts timestamp time index, val double); + +Affected Rows: 0 + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +Affected Rows: 7 + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +drop table t1; + +Affected Rows: 0 + +drop table t2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/set_operation.sql b/tests/cases/standalone/common/promql/set_operation.sql index e91460df3478..6a71711bd896 100644 --- a/tests/cases/standalone/common/promql/set_operation.sql +++ b/tests/cases/standalone/common/promql/set_operation.sql @@ -79,7 +79,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -- # On overlap the rhs samples must be dropped. @@ -90,7 +90,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; @@ -173,3 +173,35 @@ drop table http_requests; drop table cpu_count; drop table vector_matching_a; + +-- the following cases are not from Prometheus. + +create table t1 (ts timestamp time index, job string primary key, val double); + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +create table t2 (ts timestamp time index, val double); + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + +drop table t1; + +drop table t2; From ff2de5b247b0b496b9fa404ad6386d7b48b67567 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 28 Dec 2023 10:11:05 +0000 Subject: [PATCH 2/7] feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1) Signed-off-by: Zhenchi --- Cargo.lock | 13 +- Cargo.toml | 3 +- src/index/src/inverted_index/error.rs | 2 +- .../search/fst_apply/intersection_apply.rs | 112 ++++---- src/mito2/Cargo.toml | 1 + src/mito2/src/error.rs | 12 + src/mito2/src/row_converter.rs | 6 +- src/mito2/src/sst.rs | 1 + src/mito2/src/sst/index.rs | 16 ++ src/mito2/src/sst/index/applier.rs | 42 +++ src/mito2/src/sst/index/applier/builder.rs | 258 ++++++++++++++++++ .../src/sst/index/applier/builder/between.rs | 171 ++++++++++++ src/mito2/src/sst/index/codec.rs | 65 +++++ 13 files changed, 651 insertions(+), 51 deletions(-) create mode 100644 src/mito2/src/sst/index.rs create mode 100644 src/mito2/src/sst/index/applier.rs create mode 100644 src/mito2/src/sst/index/applier/builder.rs create mode 100644 src/mito2/src/sst/index/applier/builder/between.rs create mode 100644 src/mito2/src/sst/index/codec.rs diff --git a/Cargo.lock b/Cargo.lock index abe0acb61213..b22b67875cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,7 +4029,7 @@ dependencies = [ "prost 0.12.3", "rand", "regex", - "regex-automata 0.1.10", + "regex-automata 0.2.0", "snafu", "tokio", "tokio-util", @@ -4977,6 +4977,7 @@ dependencies = [ "datatypes", "futures", "humantime-serde", + "index", "lazy_static", "log-store", "memcomparable", @@ -7134,8 +7135,18 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9368763f5a9b804326f3af749e16f9abf378d227bcdee7634b13d8f17793782" dependencies = [ "fst", + "memchr", "regex-syntax 0.6.29", ] diff --git a/Cargo.toml b/Cargo.toml index 0e38d914eccb..a3413aa9d48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ prost = "0.12" raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } rand = "0.8" regex = "1.8" -regex-automata = { version = "0.1", features = ["transducer"] } +regex-automata = { version = "0.2", features = ["transducer"] } reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls-native-roots", @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } frontend = { path = "src/frontend" } +index = { path = "src/index" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index b795e33003b7..6e5f39006eb9 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -113,7 +113,7 @@ pub enum Error { #[snafu(display("Failed to parse regex DFA"))] ParseDFA { #[snafu(source)] - error: regex_automata::Error, + error: Box, location: Location, }, diff --git a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs index a0ae0d7b9afb..a608acd0bab5 100644 --- a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs @@ -14,7 +14,7 @@ use fst::map::OpBuilder; use fst::{IntoStreamer, Streamer}; -use regex_automata::DenseDFA; +use regex_automata::dfa::dense::DFA; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ @@ -24,15 +24,13 @@ use crate::inverted_index::search::fst_apply::FstApplier; use crate::inverted_index::search::predicate::{Predicate, Range}; use crate::inverted_index::FstMap; -type Dfa = DenseDFA, usize>; - /// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns. pub struct IntersectionFstApplier { /// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap. ranges: Vec, /// A list of `Dfa` compiled from regular expression patterns. - dfas: Vec, + dfas: Vec>>, } impl FstApplier for IntersectionFstApplier { @@ -88,8 +86,8 @@ impl IntersectionFstApplier { match predicate { Predicate::Range(range) => ranges.push(range.range), Predicate::RegexMatch(regex) => { - let dfa = DenseDFA::new(®ex.pattern); - let dfa = dfa.context(ParseDFASnafu)?; + let dfa = DFA::new(®ex.pattern); + let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?; dfas.push(dfa); } // Rejection of `InList` predicates is enforced here. @@ -210,47 +208,67 @@ mod tests { #[test] fn test_intersection_fst_applier_with_valid_pattern() { - let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap(); - - let applier = create_applier_from_pattern("a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("a.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("d.?").unwrap(); - let results = applier.apply(&test_fst); - assert!(results.is_empty()); - - let applier = create_applier_from_pattern("a.?|b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2]); - - let applier = create_applier_from_pattern("d.?|a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern(".*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2, 3]); + let test_fst = FstMap::from_iter([("123", 1), ("abc", 2)]).unwrap(); + + let cases = vec![ + ("1", vec![1]), + ("2", vec![1]), + ("3", vec![1]), + ("^1", vec![1]), + ("^2", vec![]), + ("^3", vec![]), + ("^1.*", vec![1]), + ("^.*2", vec![1]), + ("^.*3", vec![1]), + ("1$", vec![]), + ("2$", vec![]), + ("3$", vec![1]), + ("1.*$", vec![1]), + ("2.*$", vec![1]), + ("3.*$", vec![1]), + ("^1..$", vec![1]), + ("^.2.$", vec![1]), + ("^..3$", vec![1]), + ("^[0-9]", vec![1]), + ("^[0-9]+$", vec![1]), + ("^[0-9][0-9]$", vec![]), + ("^[0-9][0-9][0-9]$", vec![1]), + ("^123$", vec![1]), + ("a", vec![2]), + ("b", vec![2]), + ("c", vec![2]), + ("^a", vec![2]), + ("^b", vec![]), + ("^c", vec![]), + ("^a.*", vec![2]), + ("^.*b", vec![2]), + ("^.*c", vec![2]), + ("a$", vec![]), + ("b$", vec![]), + ("c$", vec![2]), + ("a.*$", vec![2]), + ("b.*$", vec![2]), + ("c.*$", vec![2]), + ("^.[a-z]", vec![2]), + ("^abc$", vec![2]), + ("^ab$", vec![]), + ("abc$", vec![2]), + ("^a.c$", vec![2]), + ("^..c$", vec![2]), + ("ab", vec![2]), + (".*", vec![1, 2]), + ("", vec![1, 2]), + ("^$", vec![]), + ("1|a", vec![1, 2]), + ("^123$|^abc$", vec![1, 2]), + ("^123$|d", vec![1]), + ]; + + for (pattern, expected) in cases { + let applier = create_applier_from_pattern(pattern).unwrap(); + let results = applier.apply(&test_fst); + assert_eq!(results, expected); + } } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 8c3ef50ec2c7..a28e4f0426ea 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -39,6 +39,7 @@ datafusion.workspace = true datatypes.workspace = true futures.workspace = true humantime-serde.workspace = true +index.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 39457281d76b..a54956442596 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -423,6 +423,16 @@ pub enum Error { #[snafu(source)] error: parquet::errors::ParquetError, }, + + #[snafu(display("Column not found, column: {column}"))] + ColumnNotFound { column: String, location: Location }, + + #[snafu(display("Failed to build index applier"))] + BuildIndexApplier { + #[snafu(source)] + error: index::inverted_index::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -468,6 +478,8 @@ impl ErrorExt for Error { | InvalidRequest { .. } | FillDefault { .. } | ConvertColumnDataType { .. } + | ColumnNotFound { .. } + | BuildIndexApplier { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 4cc6fd3274ac..33ef05433521 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -84,7 +84,11 @@ impl SortField { } impl SortField { - fn serialize(&self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef) -> Result<()> { + pub(crate) fn serialize( + &self, + serializer: &mut Serializer<&mut Vec>, + value: &ValueRef, + ) -> Result<()> { macro_rules! cast_value_and_serialize { ( $self: ident; diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 32c7b4951a55..55939c2d246a 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -16,5 +16,6 @@ pub mod file; pub mod file_purger; +mod index; pub mod parquet; pub(crate) mod version; diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs new file mode 100644 index 000000000000..34ccd1d0f71f --- /dev/null +++ b/src/mito2/src/sst/index.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod applier; +mod codec; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs new file mode 100644 index 000000000000..d4c9350caa4f --- /dev/null +++ b/src/mito2/src/sst/index/applier.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod builder; + +use std::sync::Arc; + +use index::inverted_index::search::index_apply::IndexApplier; +use object_store::ObjectStore; + +#[allow(dead_code)] +#[derive(Clone)] +pub struct SstIndexApplier { + region_dir: String, + object_store: ObjectStore, + index_applier: Arc, +} + +impl SstIndexApplier { + pub fn new( + region_dir: String, + object_store: ObjectStore, + index_applier: Arc, + ) -> Self { + Self { + region_dir, + object_store, + index_applier, + } + } +} diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs new file mode 100644 index 000000000000..5034c757caee --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -0,0 +1,258 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod between; + +// TODO(zhongzc): This PR is too large. The following modules are comming soon. + +// mod comparison; +// mod eq_list; +// mod in_list; +// mod regex_match; + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use common_query::logical_plan::Expr; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr as DfExpr; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::inverted_index::search::index_apply::PredicatesIndexApplier; +use index::inverted_index::search::predicate::Predicate; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; + +use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result}; +use crate::row_converter::SortField; +use crate::sst::index::applier::SstIndexApplier; +use crate::sst::index::codec::IndexValueCodec; + +type ColumnName = String; + +/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. +pub struct SstIndexApplierBuilder<'a> { + /// Directory of the region, required argument for constructing [`SstIndexApplier`]. + region_dir: String, + + /// Object store, required argument for constructing [`SstIndexApplier`]. + object_store: ObjectStore, + + /// Metadata of the region, used to get metadata like column type. + metadata: &'a RegionMetadata, + + /// Stores predicates during traversal on the Expr tree. + output: HashMap>, +} + +impl<'a> SstIndexApplierBuilder<'a> { + /// Creates a new [`SstIndexApplierBuilder`]. + #[allow(dead_code)] + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + output: HashMap::default(), + } + } + + /// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on + /// the expressions provided. If no predicates match, returns `None`. + #[allow(dead_code)] + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr.df_expr())?; + } + + if self.output.is_empty() { + return Ok(None); + } + + let predicates = self.output.into_iter().collect(); + let applier = PredicatesIndexApplier::try_from(predicates); + Ok(Some(SstIndexApplier::new( + self.region_dir, + self.object_store, + Arc::new(applier.context(BuildIndexApplierSnafu)?), + ))) + } + + /// Recursively traverses expressions to collect predicates. + /// Results are stored in `self.output`. + fn traverse_and_collect(&mut self, expr: &DfExpr) -> Result<()> { + match expr { + DfExpr::Between(between) => self.collect_between(between), + + // TODO(zhongzc): This PR is too large. The following arms are comming soon. + + // DfExpr::InList(in_list) => self.collect_inlist(in_list), + // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + // Operator::And => { + // self.traverse_and_collect(left)?; + // self.traverse_and_collect(right) + // } + // Operator::Or => self.collect_or_eq_list(left, right), + // Operator::Eq => self.collect_eq(left, right), + // Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + // self.collect_comparison_expr(left, op, right) + // } + // Operator::RegexMatch => self.collect_regex_match(left, right), + // _ => Ok(()), + // }, + + // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ... + _ => Ok(()), + } + } + + /// Helper function to add a predicate to the output. + fn add_predicate(&mut self, column_name: &str, predicate: Predicate) { + match self.output.get_mut(column_name) { + Some(predicates) => predicates.push(predicate), + None => { + self.output.insert(column_name.to_string(), vec![predicate]); + } + } + } + + /// Helper function to get the column type of a tag column. + /// Returns `None` if the column is not a tag column. + fn tag_column_type(&self, column_name: &str) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok((column.semantic_type == SemanticType::Tag) + .then(|| column.column_schema.data_type.clone())) + } + + /// Helper funtion to get a non-null literal. + fn nonnull_lit(expr: &DfExpr) -> Option<&ScalarValue> { + match expr { + DfExpr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } + + /// Helper function to get the column name of a column expression. + fn column_name(expr: &DfExpr) -> Option<&str> { + match expr { + DfExpr::Column(column) => Some(&column.name), + _ => None, + } + } + + /// Helper function to encode a literal into bytes. + fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).unwrap(); + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) + } +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use object_store::ObjectStore; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + pub(crate) fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + pub(crate) fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + pub(crate) fn tag_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "a".to_string(), + }) + } + + pub(crate) fn field_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "b".to_string(), + }) + } + + pub(crate) fn nonexistent_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "nonexistent".to_string(), + }) + } + + pub(crate) fn string_lit(s: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + pub(crate) fn int64_lit(i: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Int64(Some(i.into()))) + } + + pub(crate) fn encoded_string(s: impl Into) -> Vec { + let mut bytes = vec![]; + IndexValueCodec::encode_value( + Value::from(s.into()).as_value_ref(), + &SortField::new(ConcreteDataType::string_datatype()), + &mut bytes, + ) + .unwrap(); + bytes + } +} diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs new file mode 100644 index 000000000000..50ae7073b2db --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_expr::Between; +use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`. + pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> { + if between.negated { + return Ok(()); + } + + let Some(column_name) = Self::column_name(&between.expr) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + let Some(low) = Self::nonnull_lit(&between.low) else { + return Ok(()); + }; + let Some(high) = Self::nonnull_lit(&between.high) else { + return Ok(()); + }; + + let predicate = Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: Self::encode_lit(low, data_type.clone())?, + }), + upper: Some(Bound { + inclusive: true, + value: Self::encode_lit(high, data_type)?, + }), + }, + }); + + self.add_predicate(column_name, predicate); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_between_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: encoded_string("abc"), + }), + upper: Some(Bound { + inclusive: true, + value: encoded_string("def"), + }), + } + }) + ); + } + + #[test] + fn test_collect_between_negated() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: true, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(field_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(int64_lit(123)), + high: Box::new(int64_lit(456)), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(nonexistent_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs new file mode 100644 index 000000000000..ada5ac07cbfc --- /dev/null +++ b/src/mito2/src/sst/index/codec.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::value::ValueRef; +use memcomparable::Serializer; + +use crate::error::Result; +use crate::row_converter::SortField; + +/// Encodes index values according to their data types for sorting and storage use. +pub struct IndexValueCodec; + +impl IndexValueCodec { + /// Serializes a `ValueRef` using the data type defined in `SortField` and writes + /// the result into a buffer. + /// + /// # Arguments + /// * `value` - The value to be encoded. + /// * `field` - Contains data type to guide serialization. + /// * `buffer` - Destination buffer for the serialized value. + pub fn encode_value(value: ValueRef, field: &SortField, buffer: &mut Vec) -> Result<()> { + buffer.reserve(field.estimated_size()); + let mut serializer = Serializer::new(buffer); + field.serialize(&mut serializer, &value) + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + + use super::*; + use crate::error::Error; + + #[test] + fn test_encode_value_basic() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::string_datatype()); + + let mut buffer = Vec::new(); + IndexValueCodec::encode_value(value, &field, &mut buffer).unwrap(); + assert!(!buffer.is_empty()); + } + + #[test] + fn test_encode_value_type_mismatch() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::int64_datatype()); + + let mut buffer = Vec::new(); + let res = IndexValueCodec::encode_value(value, &field, &mut buffer); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + } +} From 55fc1f9583e3d5e5938c0b46abf53317e4ac1d56 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 28 Dec 2023 10:19:24 +0000 Subject: [PATCH 3/7] chore: add docs Signed-off-by: Zhenchi --- src/mito2/src/sst/index.rs | 2 ++ src/mito2/src/sst/index/applier.rs | 11 +++++++++-- src/mito2/src/sst/index/applier/builder.rs | 2 -- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 34ccd1d0f71f..baffda27aa6e 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,5 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(dead_code)] + pub mod applier; mod codec; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index d4c9350caa4f..564a20d33c0f 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -19,15 +19,22 @@ use std::sync::Arc; use index::inverted_index::search::index_apply::IndexApplier; use object_store::ObjectStore; -#[allow(dead_code)] -#[derive(Clone)] +/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files +/// and returning the relevant row group ids for further scan. pub struct SstIndexApplier { + /// The root directory of the region. region_dir: String, + + /// Object store responsible for accessing SST files. object_store: ObjectStore, + + /// Predifined index applier used to apply predicates to index files + /// and return the relevant row group ids for further scan. index_applier: Arc, } impl SstIndexApplier { + /// Creates a new [`SstIndexApplier`]. pub fn new( region_dir: String, object_store: ObjectStore, diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 5034c757caee..6ecd0b0e5d5c 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -60,7 +60,6 @@ pub struct SstIndexApplierBuilder<'a> { impl<'a> SstIndexApplierBuilder<'a> { /// Creates a new [`SstIndexApplierBuilder`]. - #[allow(dead_code)] pub fn new( region_dir: String, object_store: ObjectStore, @@ -76,7 +75,6 @@ impl<'a> SstIndexApplierBuilder<'a> { /// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on /// the expressions provided. If no predicates match, returns `None`. - #[allow(dead_code)] pub fn build(mut self, exprs: &[Expr]) -> Result> { for expr in exprs { self.traverse_and_collect(expr.df_expr())?; From 09639a7412228a3bc14d7f30f7c7bcc417a83f63 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 28 Dec 2023 10:36:03 +0000 Subject: [PATCH 4/7] fix: typos Signed-off-by: Zhenchi --- src/common/config/src/wal/kafka.rs | 2 +- src/mito2/src/sst/index/applier.rs | 2 +- src/mito2/src/sst/index/applier/builder.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index e93aa6cb2271..858991264bb6 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -42,7 +42,7 @@ pub struct KafkaConfig { #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size a kakfa batch producer could buffer. + /// The maximum log size a kafka batch producer could buffer. pub max_batch_size: ReadableSize, /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 564a20d33c0f..633aad007f1a 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -28,7 +28,7 @@ pub struct SstIndexApplier { /// Object store responsible for accessing SST files. object_store: ObjectStore, - /// Predifined index applier used to apply predicates to index files + /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. index_applier: Arc, } diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 6ecd0b0e5d5c..6414081ac713 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -14,7 +14,7 @@ mod between; -// TODO(zhongzc): This PR is too large. The following modules are comming soon. +// TODO(zhongzc): This PR is too large. The following modules are coming soon. // mod comparison; // mod eq_list; @@ -99,7 +99,7 @@ impl<'a> SstIndexApplierBuilder<'a> { match expr { DfExpr::Between(between) => self.collect_between(between), - // TODO(zhongzc): This PR is too large. The following arms are comming soon. + // TODO(zhongzc): This PR is too large. The following arms are coming soon. // DfExpr::InList(in_list) => self.collect_inlist(in_list), // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { @@ -145,7 +145,7 @@ impl<'a> SstIndexApplierBuilder<'a> { .then(|| column.column_schema.data_type.clone())) } - /// Helper funtion to get a non-null literal. + /// Helper function to get a non-null literal. fn nonnull_lit(expr: &DfExpr) -> Option<&ScalarValue> { match expr { DfExpr::Literal(lit) if !lit.is_null() => Some(lit), From 19c5f38f1057837f06ae6c0d7014950376d3907e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 29 Dec 2023 04:29:34 +0000 Subject: [PATCH 5/7] fix: address comments Signed-off-by: Zhenchi --- src/mito2/src/sst/index/applier.rs | 6 ++---- src/mito2/src/sst/index/applier/builder.rs | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 633aad007f1a..95ca25ba003d 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -14,8 +14,6 @@ pub mod builder; -use std::sync::Arc; - use index::inverted_index::search::index_apply::IndexApplier; use object_store::ObjectStore; @@ -30,7 +28,7 @@ pub struct SstIndexApplier { /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. - index_applier: Arc, + index_applier: Box, } impl SstIndexApplier { @@ -38,7 +36,7 @@ impl SstIndexApplier { pub fn new( region_dir: String, object_store: ObjectStore, - index_applier: Arc, + index_applier: Box, ) -> Self { Self { region_dir, diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 6414081ac713..d3cf6dc799b8 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -22,10 +22,10 @@ mod between; // mod regex_match; use std::collections::HashMap; -use std::sync::Arc; use api::v1::SemanticType; use common_query::logical_plan::Expr; +use common_telemetry::warn; use datafusion_common::ScalarValue; use datafusion_expr::Expr as DfExpr; use datatypes::data_type::ConcreteDataType; @@ -77,7 +77,7 @@ impl<'a> SstIndexApplierBuilder<'a> { /// the expressions provided. If no predicates match, returns `None`. pub fn build(mut self, exprs: &[Expr]) -> Result> { for expr in exprs { - self.traverse_and_collect(expr.df_expr())?; + self.traverse_and_collect(expr.df_expr()); } if self.output.is_empty() { @@ -89,14 +89,14 @@ impl<'a> SstIndexApplierBuilder<'a> { Ok(Some(SstIndexApplier::new( self.region_dir, self.object_store, - Arc::new(applier.context(BuildIndexApplierSnafu)?), + Box::new(applier.context(BuildIndexApplierSnafu)?), ))) } /// Recursively traverses expressions to collect predicates. /// Results are stored in `self.output`. - fn traverse_and_collect(&mut self, expr: &DfExpr) -> Result<()> { - match expr { + fn traverse_and_collect(&mut self, expr: &DfExpr) { + let res = match expr { DfExpr::Between(between) => self.collect_between(between), // TODO(zhongzc): This PR is too large. The following arms are coming soon. @@ -104,8 +104,9 @@ impl<'a> SstIndexApplierBuilder<'a> { // DfExpr::InList(in_list) => self.collect_inlist(in_list), // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { // Operator::And => { - // self.traverse_and_collect(left)?; - // self.traverse_and_collect(right) + // self.traverse_and_collect(left); + // self.traverse_and_collect(right); + // Ok(()) // } // Operator::Or => self.collect_or_eq_list(left, right), // Operator::Eq => self.collect_eq(left, right), @@ -118,6 +119,10 @@ impl<'a> SstIndexApplierBuilder<'a> { // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ... _ => Ok(()), + }; + + if let Err(err) = res { + warn!("Failed to collect predicates, ignore it. error: {err}, expr: {expr}"); } } From a1792d1bb326b9f87832e75e16e6802ff168ca0f Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 29 Dec 2023 14:54:28 +0800 Subject: [PATCH 6/7] Update src/mito2/src/sst/index/applier/builder.rs Co-authored-by: Yingwen --- src/mito2/src/sst/index/applier/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index d3cf6dc799b8..95c812017cdb 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -122,7 +122,7 @@ impl<'a> SstIndexApplierBuilder<'a> { }; if let Err(err) = res { - warn!("Failed to collect predicates, ignore it. error: {err}, expr: {expr}"); + warn!(err; "Failed to collect predicates, ignore it. expr: {expr}"); } } From 9c980340f2f4ed7c56cc26d4996c35fab95f615b Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 29 Dec 2023 18:37:48 +0000 Subject: [PATCH 7/7] fix: remove unwrap Signed-off-by: Zhenchi --- src/mito2/src/error.rs | 8 ++++++++ src/mito2/src/sst/index/applier/builder.rs | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index a54956442596..610955c3aa0f 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -433,6 +433,13 @@ pub enum Error { error: index::inverted_index::error::Error, location: Location, }, + + #[snafu(display("Failed to convert value"))] + ConvertValue { + #[snafu(source)] + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -516,6 +523,7 @@ impl ErrorExt for Error { JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, + ConvertValue { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 95c812017cdb..52af22effb18 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -36,7 +36,7 @@ use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; -use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result}; +use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; @@ -168,7 +168,7 @@ impl<'a> SstIndexApplierBuilder<'a> { /// Helper function to encode a literal into bytes. fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { - let value = Value::try_from(lit.clone()).unwrap(); + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; let mut bytes = vec![]; let field = SortField::new(data_type); IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?;