diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 0c16c861005a..ff2195e532ee 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -19,6 +19,8 @@ mod normalize; mod planner; mod range_manipulate; mod series_divide; +#[cfg(test)] +mod test_util; mod union_distinct_on; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index ba155627d2c5..e65592bb374e 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -445,40 +445,12 @@ impl InstantManipulateStream { #[cfg(test)] mod test { - use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, - }; - use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datatypes::arrow::array::TimestampMillisecondArray; - use datatypes::arrow_array::StringArray; use super::*; - - const TIME_INDEX_COLUMN: &str = "timestamp"; - - fn prepare_test_data() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - Field::new("path", DataType::Utf8, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - 180_000, 240_000, // every 60s - 241_000, 271_000, 291_000, // others - ])) as _; - let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; - let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; - let data = RecordBatch::try_new( - schema.clone(), - vec![timestamp_column, field_column, path_column], - ) - .unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } + use crate::extension_plan::test_util::{ + prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN, + }; async fn do_normalize_test( start: Millisecond, @@ -749,22 +721,6 @@ mod test { do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await; } - fn prepare_test_data_with_nan() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - ])) as _; - let field_column = - Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; - let data = - RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } - #[tokio::test] async fn lookback_10s_interval_10s_with_nan() { let expected = String::from( diff --git a/src/promql/src/extension_plan/test_util.rs b/src/promql/src/extension_plan/test_util.rs new file mode 100644 index 000000000000..f751cb9fa84b --- /dev/null +++ b/src/promql/src/extension_plan/test_util.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utils for testing extension plan + +use std::sync::Arc; + +use common_recordbatch::DfRecordBatch as RecordBatch; +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, +}; +use datafusion::physical_plan::memory::MemoryExec; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow_array::StringArray; + +pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp"; + +pub(crate) fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; + let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, field_column, path_column], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} + +pub(crate) fn prepare_test_data_with_nan() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; + let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 99da262f1c40..22551b73f810 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -35,12 +35,31 @@ use datatypes::arrow::compute; use futures::future::BoxFuture; use futures::{ready, Stream, StreamExt, TryStreamExt}; +/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case. +/// +/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The +/// most different part is that it treat left child and right child differently: +/// - All columns from left child will be outputted. +/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`. +/// - When there is a collision: +/// - If the collision is from right child itself, only the first observed row will be +/// preserved. All others are discarded. +/// - If the collision is from left child, the row in right child will be discarded. +/// - The output order is not maintained. This plan will output left child first, then right child. +/// - The output schema contains all columns from left or right child plans. +/// +/// From the implementation perspective, this operator is similar to `HashJoin`, but the +/// probe side is the right child, and the build side is the left child. Another difference +/// is that the probe is opting-out. +/// +/// This plan will exhaust the right child first to build probe hash table, then streaming +/// on left side, and use the left side to "mask" the hash table. #[derive(Debug, PartialEq, Eq, Hash)] pub struct UnionDistinctOn { left: LogicalPlan, right: LogicalPlan, /// The columns to compare for equality. - /// TIME INDEX is not included. + /// TIME INDEX is included. compare_keys: Vec, ts_col: String, output_schema: DFSchemaRef, @@ -250,6 +269,7 @@ impl DisplayAs for UnionDistinctOnExec { } } +// TODO(ruihang): some unused fields are for metrics, which will be implemented later. #[allow(dead_code)] pub struct UnionDistinctOnStream { left: SendableRecordBatchStream, @@ -465,3 +485,92 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult