Skip to content

Commit

Permalink
add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Dec 27, 2023
1 parent 3a6e82b commit 288e591
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 48 deletions.
2 changes: 2 additions & 0 deletions src/promql/src/extension_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod normalize;
mod planner;
mod range_manipulate;
mod series_divide;
#[cfg(test)]
mod test_util;
mod union_distinct_on;

use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
Expand Down
50 changes: 3 additions & 47 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,40 +445,12 @@ impl InstantManipulateStream {

#[cfg(test)]
mod test {
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;

use super::*;

const TIME_INDEX_COLUMN: &str = "timestamp";

fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],
)
.unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
use crate::extension_plan::test_util::{
prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN,
};

async fn do_normalize_test(
start: Millisecond,
Expand Down Expand Up @@ -749,22 +721,6 @@ mod test {
do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
}

fn prepare_test_data_with_nan() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column =
Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _;
let data =
RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}

#[tokio::test]
async fn lookback_10s_interval_10s_with_nan() {
let expected = String::from(
Expand Down
64 changes: 64 additions & 0 deletions src/promql/src/extension_plan/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utils for testing extension plan
use std::sync::Arc;

use common_recordbatch::DfRecordBatch as RecordBatch;
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::physical_plan::memory::MemoryExec;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;

pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp";

pub(crate) fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],
)
.unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}

pub(crate) fn prepare_test_data_with_nan() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _;
let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
111 changes: 110 additions & 1 deletion src/promql/src/extension_plan/union_distinct_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,31 @@ use datatypes::arrow::compute;
use futures::future::BoxFuture;
use futures::{ready, Stream, StreamExt, TryStreamExt};

/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case.
///
/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The
/// most different part is that it treat left child and right child differently:
/// - All columns from left child will be outputted.
/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`.
/// - When there is a collision:
/// - If the collision is from right child itself, only the first observed row will be
/// preserved. All others are discarded.
/// - If the collision is from left child, the row in right child will be discarded.
/// - The output order is not maintained. This plan will output left child first, then right child.
/// - The output schema contains all columns from left or right child plans.
///
/// From the implementation perspective, this operator is similar to `HashJoin`, but the
/// probe side is the right child, and the build side is the left child. Another difference
/// is that the probe is opting-out.
///
/// This plan will exhaust the right child first to build probe hash table, then streaming
/// on left side, and use the left side to "mask" the hash table.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct UnionDistinctOn {
left: LogicalPlan,
right: LogicalPlan,
/// The columns to compare for equality.
/// TIME INDEX is not included.
/// TIME INDEX is included.
compare_keys: Vec<String>,
ts_col: String,
output_schema: DFSchemaRef,
Expand Down Expand Up @@ -250,6 +269,7 @@ impl DisplayAs for UnionDistinctOnExec {
}
}

// TODO(ruihang): some unused fields are for metrics, which will be implemented later.
#[allow(dead_code)]
pub struct UnionDistinctOnStream {
left: SendableRecordBatchStream,
Expand Down Expand Up @@ -465,3 +485,92 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult<Record
let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?;
Ok(result)
}

#[cfg(test)]
mod test {
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};

use super::*;

#[test]
fn test_interleave_batches() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)
.unwrap();

let batch2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from(vec![10, 11, 12])),
],
)
.unwrap();

let batch3 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![13, 14, 15])),
Arc::new(Int32Array::from(vec![16, 17, 18])),
],
)
.unwrap();

let batches = vec![batch1, batch2, batch3];
let indices = vec![(0, 0), (1, 0), (2, 0), (0, 1), (1, 1), (2, 1)];
let result = interleave_batches(batches, indices).unwrap();

let expected = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 7, 13, 2, 8, 14])),
Arc::new(Int32Array::from(vec![4, 10, 16, 5, 11, 17])),
],
)
.unwrap();

assert_eq!(result, expected);
}

#[test]
fn test_take_batch() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)
.unwrap();

let indices = vec![0, 2];
let result = take_batch(&batch, &indices).unwrap();

let expected = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 3])),
Arc::new(Int32Array::from(vec![4, 6])),
],
)
.unwrap();

assert_eq!(result, expected);
}
}

0 comments on commit 288e591

Please sign in to comment.