Skip to content

Commit

Permalink
fix: promql scalar when input empty batch (#3779)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange authored Apr 23, 2024
1 parent f5e5a89 commit 924c52a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 41 deletions.
134 changes: 93 additions & 41 deletions src/promql/src/extension_plan/scalar_calculate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ impl RecordBatchStream for ScalarCalculateStream {
impl ScalarCalculateStream {
fn update_batch(&mut self, batch: RecordBatch) -> DataFusionResult<()> {
let _timer = self.metric.elapsed_compute();
// if have multi time series, scalar will return NaN
if self.have_multi_series {
// if have multi time series or empty batch, scalar will return NaN
if self.have_multi_series || batch.num_rows() == 0 {
return Ok(());
}
// fast path: no tag columns means all data belongs to the same series.
Expand Down Expand Up @@ -493,51 +493,18 @@ mod test {

use super::*;

fn prepare_test_data(diff_series: bool) -> MemoryExec {
fn prepare_test_data(series: Vec<RecordBatch>) -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let batch_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let batch_2 = if diff_series {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "😝"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap()
} else {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap()
};
MemoryExec::try_new(&[vec![batch_1, batch_2]], schema, None).unwrap()
MemoryExec::try_new(&[series], schema, None).unwrap()
}

async fn run_test(diff_series: bool, expected: &str) {
let memory_exec = Arc::new(prepare_test_data(diff_series));
async fn run_test(series: Vec<RecordBatch>, expected: &str) {
let memory_exec = Arc::new(prepare_test_data(series));
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("val", DataType::Float64, true),
Expand Down Expand Up @@ -570,8 +537,35 @@ mod test {

#[tokio::test]
async fn same_series() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
run_test(
false,
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap(),
],
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
Expand All @@ -586,8 +580,66 @@ mod test {

#[tokio::test]
async fn diff_series() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
run_test(
true,
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "😝"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap(),
],
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
\n| 1970-01-01T00:00:00 | NaN |\
\n| 1970-01-01T00:00:05 | NaN |\
\n| 1970-01-01T00:00:10 | NaN |\
\n| 1970-01-01T00:00:15 | NaN |\
\n+---------------------+-----+",
)
.await
}

#[tokio::test]
async fn empty_series() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
run_test(
vec![RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::new_null(0)),
Arc::new(StringArray::new_null(0)),
Arc::new(StringArray::new_null(0)),
Arc::new(Float64Array::new_null(0)),
],
)
.unwrap()],
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
Expand Down
29 changes: 29 additions & 0 deletions tests/cases/standalone/common/promql/scalar.result
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,35 @@ TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host));
| 1970-01-01T00:00:15 | NaN |
+---------------------+-----------------------------------+

-- No data input in scalar
TQL EVAL (350, 360, '5s') scalar(host{host="host1"});

+---------------------+-------------+
| ts | scalar(val) |
+---------------------+-------------+
| 1970-01-01T00:05:50 | NaN |
| 1970-01-01T00:05:55 | NaN |
| 1970-01-01T00:06:00 | NaN |
+---------------------+-------------+

DELETE from host where ts = 0;

Affected Rows: 2

-- Under this case, InstantManipulate will input a valid record batch but output a empty record batch (because no data will be selected in this batch)
-- Test input a empty record batch to ScalarCalculate plan
TQL EVAL (0, 1600, '6m40s') scalar(host{host="host1"});

+---------------------+-------------+
| ts | scalar(val) |
+---------------------+-------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:06:40 | NaN |
| 1970-01-01T00:13:20 | NaN |
| 1970-01-01T00:20:00 | NaN |
| 1970-01-01T00:26:40 | NaN |
+---------------------+-------------+

-- error case
TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"}));

Expand Down
9 changes: 9 additions & 0 deletions tests/cases/standalone/common/promql/scalar.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"};
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host));

-- No data input in scalar
TQL EVAL (350, 360, '5s') scalar(host{host="host1"});

DELETE from host where ts = 0;

-- Under this case, InstantManipulate will input a valid record batch but output a empty record batch (because no data will be selected in this batch)
-- Test input a empty record batch to ScalarCalculate plan
TQL EVAL (0, 1600, '6m40s') scalar(host{host="host1"});

-- error case

TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"}));
Expand Down

0 comments on commit 924c52a

Please sign in to comment.