Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lots of corner cases in PromQL #1345

Merged
merged 8 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/frontend/src/tests/promql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,12 @@ async fn stddev_by_label(instance: Arc<dyn MockInstance>) {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+----------+---------------------+-----------------------------+\
\n| instance | ts | STDDEV(http_requests.value) |\
\n+----------+---------------------+-----------------------------+\
\n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n+----------+---------------------+-----------------------------+",
"+----------+---------------------+--------------------------------+\
\n| instance | ts | STDDEVPOP(http_requests.value) |\
\n+----------+---------------------+--------------------------------+\
\n| 0 | 1970-01-01T00:00:00 | 223.606797749979 |\
\n| 1 | 1970-01-01T00:00:00 | 223.606797749979 |\
\n+----------+---------------------+--------------------------------+",
)
.await;
}
Expand Down
35 changes: 33 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::array::{Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
Expand Down Expand Up @@ -49,6 +49,8 @@ pub struct InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
/// A optional column for validating staleness
field_column: Option<String>,
input: LogicalPlan,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: inputs[0].clone(),
}
}
Expand All @@ -98,6 +101,7 @@ impl InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,
input: LogicalPlan,
) -> Self {
Self {
Expand All @@ -106,6 +110,7 @@ impl InstantManipulate {
lookback_delta,
interval,
time_index_column,
field_column,
input,
}
}
Expand All @@ -117,6 +122,7 @@ impl InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
Expand All @@ -130,6 +136,7 @@ pub struct InstantManipulateExec {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -171,6 +178,7 @@ impl ExecutionPlan for InstantManipulateExec {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
}))
Expand All @@ -189,12 +197,18 @@ impl ExecutionPlan for InstantManipulateExec {
.column_with_name(&self.time_index_column)
.expect("time index column not found")
.0;
let field_index = self
.field_column
.as_ref()
.and_then(|name| schema.column_with_name(name))
.map(|x| x.0);
Ok(Box::pin(InstantManipulateStream {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index,
field_index,
schema,
input,
metric: baseline_metric,
Expand Down Expand Up @@ -244,6 +258,7 @@ pub struct InstantManipulateStream {
interval: Millisecond,
// Column index of TIME INDEX column's position in schema
time_index: usize,
field_index: Option<usize>,

schema: SchemaRef,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -282,6 +297,11 @@ impl InstantManipulateStream {
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

// field column for staleness check
let field_column = self
.field_index
.and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());

let mut cursor = 0;
let aligned_ts = (self.start..=self.end)
.step_by(self.interval as usize)
Expand All @@ -294,7 +314,12 @@ impl InstantManipulateStream {
let curr = ts_column.value(cursor);
match curr.cmp(&expected_ts) {
Ordering::Equal => {
take_indices.push(Some(cursor as u64));
if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// ignore the NaN value
take_indices.push(None);
} else {
take_indices.push(Some(cursor as u64));
}
continue 'next;
}
Ordering::Greater => break,
Expand All @@ -308,6 +333,11 @@ impl InstantManipulateStream {
// then, search backward to lookback
loop {
let curr = ts_column.value(cursor);
if let Some(value_column) = &field_column && value_column.value(cursor).is_nan() {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
break;
}
if curr + self.lookback_delta < expected_ts {
// not found in lookback, leave this field blank.
take_indices.push(None);
Expand Down Expand Up @@ -413,6 +443,7 @@ mod test {
lookback_delta,
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: None,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand Down
37 changes: 26 additions & 11 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ use crate::extension_plan::Millisecond;
/// Roughly speaking, this method does these things:
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
/// - remove NaN values
/// - remove NaN values (optional)
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,

input: LogicalPlan,
}
Expand All @@ -71,8 +72,8 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"PromSeriesNormalize: offset=[{}], time index=[{}]",
self.offset, self.time_index_column_name
"PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
self.offset, self.time_index_column_name, self.need_filter_out_nan
)
}

Expand All @@ -82,6 +83,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: inputs[0].clone(),
}
}
Expand All @@ -91,11 +93,13 @@ impl SeriesNormalize {
pub fn new<N: AsRef<str>>(
offset: Millisecond,
time_index_column_name: N,
need_filter_out_nan: bool,
input: LogicalPlan,
) -> Self {
Self {
offset,
time_index_column_name: time_index_column_name.as_ref().to_string(),
need_filter_out_nan,
input,
}
}
Expand All @@ -104,6 +108,7 @@ impl SeriesNormalize {
Arc::new(SeriesNormalizeExec {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
Expand All @@ -114,6 +119,7 @@ impl SeriesNormalize {
pub struct SeriesNormalizeExec {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -148,6 +154,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
Ok(Arc::new(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: children[0].clone(),
metric: self.metric.clone(),
}))
Expand All @@ -169,6 +176,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
Ok(Box::pin(SeriesNormalizeStream {
offset: self.offset,
time_index,
need_filter_out_nan: self.need_filter_out_nan,
schema,
input,
metric: baseline_metric,
Expand All @@ -180,8 +188,8 @@ impl ExecutionPlan for SeriesNormalizeExec {
DisplayFormatType::Default => {
write!(
f,
"PromSeriesNormalizeExec: offset=[{}], time index=[{}]",
self.offset, self.time_index_column_name
"PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
self.offset, self.time_index_column_name, self.need_filter_out_nan
)
}
}
Expand All @@ -200,6 +208,7 @@ pub struct SeriesNormalizeStream {
offset: Millisecond,
// Column index of TIME INDEX column's position in schema
time_index: usize,
need_filter_out_nan: bool,

schema: SchemaRef,
input: SendableRecordBatchStream,
Expand All @@ -220,7 +229,7 @@ impl SeriesNormalizeStream {
ts_column.clone()
} else {
TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
)
};
let mut columns = input.columns().to_vec();
Expand All @@ -234,6 +243,10 @@ impl SeriesNormalizeStream {
.collect::<ArrowResult<Vec<_>>>()?;
let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?;

if !self.need_filter_out_nan {
return Ok(ordered_batch);
}

// TODO(ruihang): consider the "special NaN"
// filter out NaN
let mut filter = vec![true; input.num_rows()];
Expand Down Expand Up @@ -317,6 +330,7 @@ mod test {
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 0,
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand Down Expand Up @@ -349,6 +363,7 @@ mod test {
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 1_000, // offset 1s
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand All @@ -364,11 +379,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1969-12-31T23:59:59 | 10.0 | foo |\
\n| 1970-01-01T00:00:29 | 100.0 | foo |\
\n| 1970-01-01T00:00:59 | 0.0 | foo |\
\n| 1970-01-01T00:01:29 | 1000.0 | foo |\
\n| 1970-01-01T00:01:59 | 1.0 | foo |\
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n+---------------------+--------+------+",
);

Expand Down
25 changes: 19 additions & 6 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,20 @@ impl Stream for RangeManipulateStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch))))
let poll = loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let _timer = self.metric.elapsed_compute().timer();
let result = self.manipulate(batch);
if let Ok(None) = result {
continue;
} else {
break Poll::Ready(result.transpose());
}
}
Poll::Ready(other) => break Poll::Ready(other),
Poll::Pending => break Poll::Pending,
}
Poll::Pending => Poll::Pending,
};
self.metric.record_poll(poll)
}
Expand All @@ -357,10 +365,14 @@ impl RangeManipulateStream {
// Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
// But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
// And the generated timestamp is not aligned to the step. It's expected to do later.
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input);
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
}

// transform columns
let mut new_columns = input.columns().to_vec();
Expand Down Expand Up @@ -391,6 +403,7 @@ impl RangeManipulateStream {
new_columns[self.time_index] = aligned_ts;

RecordBatch::try_new(self.output_schema.clone(), new_columns)
.map(Some)
.map_err(DataFusionError::ArrowError)
}

Expand Down
14 changes: 9 additions & 5 deletions src/promql/src/functions/aggr_over_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Op
ret = "Float64Array",
display_name = "prom_count_over_time"
)]
pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> f64 {
values.len() as f64
pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
if values.is_empty() {
None
} else {
Some(values.len() as f64)
}
}

/// The most recent point value in specified interval.
Expand Down Expand Up @@ -320,13 +324,13 @@ mod test {
Some(2.0),
Some(5.0),
Some(1.0),
Some(0.0),
Some(0.0),
None,
None,
Some(3.0),
Some(3.0),
Some(3.0),
Some(1.0),
Some(0.0),
None,
],
);
}
Expand Down
1 change: 1 addition & 0 deletions src/promql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(option_get_or_insert_default)]
#![feature(let_chains)]

pub mod error;
pub mod extension_plan;
Expand Down
Loading