Skip to content

Commit

Permalink
fix: lots of corner cases in PromQL (#1345)
Browse files Browse the repository at this point in the history
* adjust plan ordering
fix offset logic
ignore empty range vector

Signed-off-by: Ruihang Xia <[email protected]>

* fix: different NaN logic between instant and range selector

Signed-off-by: Ruihang Xia <[email protected]>

* fix: enlarge selector time window

Signed-off-by: Ruihang Xia <[email protected]>

* revert change about stale NaN

Signed-off-by: Ruihang Xia <[email protected]>

* fix tests

Signed-off-by: Ruihang Xia <[email protected]>

* clean up

Signed-off-by: Ruihang Xia <[email protected]>

* rename variables

Signed-off-by: Ruihang Xia <[email protected]>

* one more rename

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Apr 10, 2023
1 parent 29c6155 commit 09f003d
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 75 deletions.
12 changes: 6 additions & 6 deletions src/frontend/src/tests/promql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,12 @@ async fn stddev_by_label(instance: Arc<dyn MockInstance>) {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+----------+---------------------+-----------------------------+\
\n| instance | ts | STDDEV(http_requests.value) |\
\n+----------+---------------------+-----------------------------+\
\n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n+----------+---------------------+-----------------------------+",
"+----------+---------------------+--------------------------------+\
\n| instance | ts | STDDEVPOP(http_requests.value) |\
\n+----------+---------------------+--------------------------------+\
\n| 0 | 1970-01-01T00:00:00 | 223.606797749979 |\
\n| 1 | 1970-01-01T00:00:00 | 223.606797749979 |\
\n+----------+---------------------+--------------------------------+",
)
.await;
}
Expand Down
35 changes: 33 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::array::{Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
Expand Down Expand Up @@ -49,6 +49,8 @@ pub struct InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
/// A optional column for validating staleness
field_column: Option<String>,
input: LogicalPlan,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: inputs[0].clone(),
}
}
Expand All @@ -98,6 +101,7 @@ impl InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,
input: LogicalPlan,
) -> Self {
Self {
Expand All @@ -106,6 +110,7 @@ impl InstantManipulate {
lookback_delta,
interval,
time_index_column,
field_column,
input,
}
}
Expand All @@ -117,6 +122,7 @@ impl InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
Expand All @@ -130,6 +136,7 @@ pub struct InstantManipulateExec {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -171,6 +178,7 @@ impl ExecutionPlan for InstantManipulateExec {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
}))
Expand All @@ -189,12 +197,18 @@ impl ExecutionPlan for InstantManipulateExec {
.column_with_name(&self.time_index_column)
.expect("time index column not found")
.0;
let field_index = self
.field_column
.as_ref()
.and_then(|name| schema.column_with_name(name))
.map(|x| x.0);
Ok(Box::pin(InstantManipulateStream {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index,
field_index,
schema,
input,
metric: baseline_metric,
Expand Down Expand Up @@ -244,6 +258,7 @@ pub struct InstantManipulateStream {
interval: Millisecond,
// Column index of TIME INDEX column's position in schema
time_index: usize,
field_index: Option<usize>,

schema: SchemaRef,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -282,6 +297,11 @@ impl InstantManipulateStream {
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

// field column for staleness check
let field_column = self
.field_index
.and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());

let mut cursor = 0;
let aligned_ts = (self.start..=self.end)
.step_by(self.interval as usize)
Expand All @@ -294,7 +314,12 @@ impl InstantManipulateStream {
let curr = ts_column.value(cursor);
match curr.cmp(&expected_ts) {
Ordering::Equal => {
take_indices.push(Some(cursor as u64));
if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// ignore the NaN value
take_indices.push(None);
} else {
take_indices.push(Some(cursor as u64));
}
continue 'next;
}
Ordering::Greater => break,
Expand All @@ -308,6 +333,11 @@ impl InstantManipulateStream {
// then, search backward to lookback
loop {
let curr = ts_column.value(cursor);
if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
break;
}
if curr + self.lookback_delta < expected_ts {
// not found in lookback, leave this field blank.
take_indices.push(None);
Expand Down Expand Up @@ -413,6 +443,7 @@ mod test {
lookback_delta,
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: None,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand Down
37 changes: 26 additions & 11 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ use crate::extension_plan::Millisecond;
/// Roughly speaking, this method does these things:
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
/// - remove NaN values
/// - remove NaN values (optional)
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,

input: LogicalPlan,
}
Expand All @@ -71,8 +72,8 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"PromSeriesNormalize: offset=[{}], time index=[{}]",
self.offset, self.time_index_column_name
"PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
self.offset, self.time_index_column_name, self.need_filter_out_nan
)
}

Expand All @@ -82,6 +83,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: inputs[0].clone(),
}
}
Expand All @@ -91,11 +93,13 @@ impl SeriesNormalize {
pub fn new<N: AsRef<str>>(
offset: Millisecond,
time_index_column_name: N,
need_filter_out_nan: bool,
input: LogicalPlan,
) -> Self {
Self {
offset,
time_index_column_name: time_index_column_name.as_ref().to_string(),
need_filter_out_nan,
input,
}
}
Expand All @@ -104,6 +108,7 @@ impl SeriesNormalize {
Arc::new(SeriesNormalizeExec {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
Expand All @@ -114,6 +119,7 @@ impl SeriesNormalize {
pub struct SeriesNormalizeExec {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -148,6 +154,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
Ok(Arc::new(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: children[0].clone(),
metric: self.metric.clone(),
}))
Expand All @@ -169,6 +176,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
Ok(Box::pin(SeriesNormalizeStream {
offset: self.offset,
time_index,
need_filter_out_nan: self.need_filter_out_nan,
schema,
input,
metric: baseline_metric,
Expand All @@ -180,8 +188,8 @@ impl ExecutionPlan for SeriesNormalizeExec {
DisplayFormatType::Default => {
write!(
f,
"PromSeriesNormalizeExec: offset=[{}], time index=[{}]",
self.offset, self.time_index_column_name
"PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
self.offset, self.time_index_column_name, self.need_filter_out_nan
)
}
}
Expand All @@ -200,6 +208,7 @@ pub struct SeriesNormalizeStream {
offset: Millisecond,
// Column index of TIME INDEX column's position in schema
time_index: usize,
need_filter_out_nan: bool,

schema: SchemaRef,
input: SendableRecordBatchStream,
Expand All @@ -220,7 +229,7 @@ impl SeriesNormalizeStream {
ts_column.clone()
} else {
TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
)
};
let mut columns = input.columns().to_vec();
Expand All @@ -234,6 +243,10 @@ impl SeriesNormalizeStream {
.collect::<ArrowResult<Vec<_>>>()?;
let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?;

if !self.need_filter_out_nan {
return Ok(ordered_batch);
}

// TODO(ruihang): consider the "special NaN"
// filter out NaN
let mut filter = vec![true; input.num_rows()];
Expand Down Expand Up @@ -317,6 +330,7 @@ mod test {
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 0,
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand Down Expand Up @@ -349,6 +363,7 @@ mod test {
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 1_000, // offset 1s
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand All @@ -364,11 +379,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1969-12-31T23:59:59 | 10.0 | foo |\
\n| 1970-01-01T00:00:29 | 100.0 | foo |\
\n| 1970-01-01T00:00:59 | 0.0 | foo |\
\n| 1970-01-01T00:01:29 | 1000.0 | foo |\
\n| 1970-01-01T00:01:59 | 1.0 | foo |\
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n+---------------------+--------+------+",
);

Expand Down
25 changes: 19 additions & 6 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,20 @@ impl Stream for RangeManipulateStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch))))
let poll = loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let _timer = self.metric.elapsed_compute().timer();
let result = self.manipulate(batch);
if let Ok(None) = result {
continue;
} else {
break Poll::Ready(result.transpose());
}
}
Poll::Ready(other) => break Poll::Ready(other),
Poll::Pending => break Poll::Pending,
}
Poll::Pending => Poll::Pending,
};
self.metric.record_poll(poll)
}
Expand All @@ -357,10 +365,14 @@ impl RangeManipulateStream {
// Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
// But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
// And the generated timestamp is not aligned to the step. It's expected to do later.
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input);
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
}

// transform columns
let mut new_columns = input.columns().to_vec();
Expand Down Expand Up @@ -391,6 +403,7 @@ impl RangeManipulateStream {
new_columns[self.time_index] = aligned_ts;

RecordBatch::try_new(self.output_schema.clone(), new_columns)
.map(Some)
.map_err(DataFusionError::ArrowError)
}

Expand Down
14 changes: 9 additions & 5 deletions src/promql/src/functions/aggr_over_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Op
ret = "Float64Array",
display_name = "prom_count_over_time"
)]
pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> f64 {
values.len() as f64
pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
if values.is_empty() {
None
} else {
Some(values.len() as f64)
}
}

/// The most recent point value in specified interval.
Expand Down Expand Up @@ -320,13 +324,13 @@ mod test {
Some(2.0),
Some(5.0),
Some(1.0),
Some(0.0),
Some(0.0),
None,
None,
Some(3.0),
Some(3.0),
Some(3.0),
Some(1.0),
Some(0.0),
None,
],
);
}
Expand Down
1 change: 1 addition & 0 deletions src/promql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(option_get_or_insert_default)]
#![feature(let_chains)]

pub mod error;
pub mod extension_plan;
Expand Down
Loading

0 comments on commit 09f003d

Please sign in to comment.