diff --git a/src/frontend/src/tests/promql_test.rs b/src/frontend/src/tests/promql_test.rs index 34db1c28a370..b26a24b870e9 100644 --- a/src/frontend/src/tests/promql_test.rs +++ b/src/frontend/src/tests/promql_test.rs @@ -468,12 +468,12 @@ async fn stddev_by_label(instance: Arc) { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+----------+---------------------+-----------------------------+\ - \n| instance | ts | STDDEV(http_requests.value) |\ - \n+----------+---------------------+-----------------------------+\ - \n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\ - \n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\ - \n+----------+---------------------+-----------------------------+", + "+----------+---------------------+--------------------------------+\ + \n| instance | ts | STDDEVPOP(http_requests.value) |\ + \n+----------+---------------------+--------------------------------+\ + \n| 0 | 1970-01-01T00:00:00 | 223.606797749979 |\ + \n| 1 | 1970-01-01T00:00:00 | 223.606797749979 |\ + \n+----------+---------------------+--------------------------------+", ) .await; } diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index b9afa7d61fb1..991b3832508f 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -18,7 +18,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use datafusion::arrow::array::{Array, TimestampMillisecondArray, UInt64Array}; +use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::DFSchemaRef; @@ -49,6 +49,8 @@ pub struct InstantManipulate { lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + /// A optional column for validating staleness + field_column: Option, input: LogicalPlan, } @@ -86,6 +88,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), + field_column: self.field_column.clone(), input: inputs[0].clone(), } } @@ -98,6 +101,7 @@ impl InstantManipulate { lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + field_column: Option, input: LogicalPlan, ) -> Self { Self { @@ -106,6 +110,7 @@ impl InstantManipulate { lookback_delta, interval, time_index_column, + field_column, input, } } @@ -117,6 +122,7 @@ impl InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), + field_column: self.field_column.clone(), input: exec_input, metric: ExecutionPlanMetricsSet::new(), }) @@ -130,6 +136,7 @@ pub struct InstantManipulateExec { lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + field_column: Option, input: Arc, metric: ExecutionPlanMetricsSet, @@ -171,6 +178,7 @@ impl ExecutionPlan for InstantManipulateExec { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), + field_column: self.field_column.clone(), input: children[0].clone(), metric: self.metric.clone(), })) @@ -189,12 +197,18 @@ impl ExecutionPlan for InstantManipulateExec { .column_with_name(&self.time_index_column) .expect("time index column not found") .0; + let field_index = self + .field_column + .as_ref() + .and_then(|name| schema.column_with_name(name)) + .map(|x| x.0); Ok(Box::pin(InstantManipulateStream { start: self.start, end: self.end, lookback_delta: self.lookback_delta, interval: self.interval, time_index, + field_index, schema, input, metric: baseline_metric, @@ -244,6 +258,7 @@ pub struct InstantManipulateStream { interval: Millisecond, // Column index of TIME INDEX column's position in schema time_index: usize, + field_index: Option, schema: SchemaRef, input: SendableRecordBatchStream, @@ -282,6 +297,11 @@ impl InstantManipulateStream { .downcast_ref::() .unwrap(); + // field column for staleness check + let field_column = self + .field_index + .and_then(|index| input.column(index).as_any().downcast_ref::()); + let mut cursor = 0; let aligned_ts = (self.start..=self.end) .step_by(self.interval as usize) @@ -294,7 +314,12 @@ impl InstantManipulateStream { let curr = ts_column.value(cursor); match curr.cmp(&expected_ts) { Ordering::Equal => { - take_indices.push(Some(cursor as u64)); + if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() { + // ignore the NaN value + take_indices.push(None); + } else { + take_indices.push(Some(cursor as u64)); + } continue 'next; } Ordering::Greater => break, @@ -308,6 +333,11 @@ impl InstantManipulateStream { // then, search backward to lookback loop { let curr = ts_column.value(cursor); + if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() { + // if the newest value is NaN, it means the value is stale, so we should not use it + take_indices.push(None); + break; + } if curr + self.lookback_delta < expected_ts { // not found in lookback, leave this field blank. take_indices.push(None); @@ -413,6 +443,7 @@ mod test { lookback_delta, interval, time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: None, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 7790bdeea338..636afdfd6d4a 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -42,11 +42,12 @@ use crate::extension_plan::Millisecond; /// Roughly speaking, this method does these things: /// - bias sample's timestamp by offset /// - sort the record batch based on timestamp column -/// - remove NaN values +/// - remove NaN values (optional) #[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesNormalize { offset: Millisecond, time_index_column_name: String, + need_filter_out_nan: bool, input: LogicalPlan, } @@ -71,8 +72,8 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize { fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "PromSeriesNormalize: offset=[{}], time index=[{}]", - self.offset, self.time_index_column_name + "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]", + self.offset, self.time_index_column_name, self.need_filter_out_nan ) } @@ -82,6 +83,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize { Self { offset: self.offset, time_index_column_name: self.time_index_column_name.clone(), + need_filter_out_nan: self.need_filter_out_nan, input: inputs[0].clone(), } } @@ -91,11 +93,13 @@ impl SeriesNormalize { pub fn new>( offset: Millisecond, time_index_column_name: N, + need_filter_out_nan: bool, input: LogicalPlan, ) -> Self { Self { offset, time_index_column_name: time_index_column_name.as_ref().to_string(), + need_filter_out_nan, input, } } @@ -104,6 +108,7 @@ impl SeriesNormalize { Arc::new(SeriesNormalizeExec { offset: self.offset, time_index_column_name: self.time_index_column_name.clone(), + need_filter_out_nan: self.need_filter_out_nan, input: exec_input, metric: ExecutionPlanMetricsSet::new(), }) @@ -114,6 +119,7 @@ impl SeriesNormalize { pub struct SeriesNormalizeExec { offset: Millisecond, time_index_column_name: String, + need_filter_out_nan: bool, input: Arc, metric: ExecutionPlanMetricsSet, @@ -148,6 +154,7 @@ impl ExecutionPlan for SeriesNormalizeExec { Ok(Arc::new(Self { offset: self.offset, time_index_column_name: self.time_index_column_name.clone(), + need_filter_out_nan: self.need_filter_out_nan, input: children[0].clone(), metric: self.metric.clone(), })) @@ -169,6 +176,7 @@ impl ExecutionPlan for SeriesNormalizeExec { Ok(Box::pin(SeriesNormalizeStream { offset: self.offset, time_index, + need_filter_out_nan: self.need_filter_out_nan, schema, input, metric: baseline_metric, @@ -180,8 +188,8 @@ impl ExecutionPlan for SeriesNormalizeExec { DisplayFormatType::Default => { write!( f, - "PromSeriesNormalizeExec: offset=[{}], time index=[{}]", - self.offset, self.time_index_column_name + "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]", + self.offset, self.time_index_column_name, self.need_filter_out_nan ) } } @@ -200,6 +208,7 @@ pub struct SeriesNormalizeStream { offset: Millisecond, // Column index of TIME INDEX column's position in schema time_index: usize, + need_filter_out_nan: bool, schema: SchemaRef, input: SendableRecordBatchStream, @@ -220,7 +229,7 @@ impl SeriesNormalizeStream { ts_column.clone() } else { TimestampMillisecondArray::from_iter( - ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)), + ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)), ) }; let mut columns = input.columns().to_vec(); @@ -234,6 +243,10 @@ impl SeriesNormalizeStream { .collect::>>()?; let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?; + if !self.need_filter_out_nan { + return Ok(ordered_batch); + } + // TODO(ruihang): consider the "special NaN" // filter out NaN let mut filter = vec![true; input.num_rows()]; @@ -317,6 +330,7 @@ mod test { let normalize_exec = Arc::new(SeriesNormalizeExec { offset: 0, time_index_column_name: TIME_INDEX_COLUMN.to_string(), + need_filter_out_nan: true, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); @@ -349,6 +363,7 @@ mod test { let normalize_exec = Arc::new(SeriesNormalizeExec { offset: 1_000, // offset 1s time_index_column_name: TIME_INDEX_COLUMN.to_string(), + need_filter_out_nan: true, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); @@ -364,11 +379,11 @@ mod test { "+---------------------+--------+------+\ \n| timestamp | value | path |\ \n+---------------------+--------+------+\ - \n| 1969-12-31T23:59:59 | 10.0 | foo |\ - \n| 1970-01-01T00:00:29 | 100.0 | foo |\ - \n| 1970-01-01T00:00:59 | 0.0 | foo |\ - \n| 1970-01-01T00:01:29 | 1000.0 | foo |\ - \n| 1970-01-01T00:01:59 | 1.0 | foo |\ + \n| 1970-01-01T00:00:01 | 10.0 | foo |\ + \n| 1970-01-01T00:00:31 | 100.0 | foo |\ + \n| 1970-01-01T00:01:01 | 0.0 | foo |\ + \n| 1970-01-01T00:01:31 | 1000.0 | foo |\ + \n| 1970-01-01T00:02:01 | 1.0 | foo |\ \n+---------------------+--------+------+", ); diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 47c23e5d6082..6b4f14b430a2 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -342,12 +342,20 @@ impl Stream for RangeManipulateStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll = match self.input.poll_next_unpin(cx) { - Poll::Ready(batch) => { - let _timer = self.metric.elapsed_compute().timer(); - Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch)))) + let poll = loop { + match self.input.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + let _timer = self.metric.elapsed_compute().timer(); + let result = self.manipulate(batch); + if let Ok(None) = result { + continue; + } else { + break Poll::Ready(result.transpose()); + } + } + Poll::Ready(other) => break Poll::Ready(other), + Poll::Pending => break Poll::Pending, } - Poll::Pending => Poll::Pending, }; self.metric.record_poll(poll) } @@ -357,10 +365,14 @@ impl RangeManipulateStream { // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198 // But they are not exactly the same, because we don't eager-evaluate on the data in this plan. // And the generated timestamp is not aligned to the step. It's expected to do later. - pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult { + pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult> { let mut other_columns = (0..input.columns().len()).collect::>(); // calculate the range let (aligned_ts, ranges) = self.calculate_range(&input); + // ignore this if all ranges are empty + if ranges.iter().all(|(_, len)| *len == 0) { + return Ok(None); + } // transform columns let mut new_columns = input.columns().to_vec(); @@ -391,6 +403,7 @@ impl RangeManipulateStream { new_columns[self.time_index] = aligned_ts; RecordBatch::try_new(self.output_schema.clone(), new_columns) + .map(Some) .map_err(DataFusionError::ArrowError) } diff --git a/src/promql/src/functions/aggr_over_time.rs b/src/promql/src/functions/aggr_over_time.rs index a8cc99dce072..5c9d0578d248 100644 --- a/src/promql/src/functions/aggr_over_time.rs +++ b/src/promql/src/functions/aggr_over_time.rs @@ -73,8 +73,12 @@ pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Op ret = "Float64Array", display_name = "prom_count_over_time" )] -pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> f64 { - values.len() as f64 +pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option { + if values.is_empty() { + None + } else { + Some(values.len() as f64) + } } /// The most recent point value in specified interval. @@ -320,13 +324,13 @@ mod test { Some(2.0), Some(5.0), Some(1.0), - Some(0.0), - Some(0.0), + None, + None, Some(3.0), Some(3.0), Some(3.0), Some(1.0), - Some(0.0), + None, ], ); } diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 6cbfaa74003c..1f636dcb5394 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(option_get_or_insert_default)] +#![feature(let_chains)] pub mod error; pub mod extension_plan; diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 055de9fe9912..5620a8220d1f 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -300,7 +300,7 @@ impl PromPlanner { let matchers = self.preprocess_label_matchers(matchers)?; self.setup_context().await?; let normalize = self - .selector_to_series_normalize_plan(offset, matchers) + .selector_to_series_normalize_plan(offset, matchers, false) .await?; let manipulate = InstantManipulate::new( self.ctx.start, @@ -311,6 +311,7 @@ impl PromPlanner { .time_index_column .clone() .expect("time index should be set in `setup_context`"), + self.ctx.field_columns.get(0).cloned(), normalize, ); LogicalPlan::Extension(Extension { @@ -332,7 +333,7 @@ impl PromPlanner { self.ctx.range = Some(range_ms); let normalize = self - .selector_to_series_normalize_plan(offset, matchers) + .selector_to_series_normalize_plan(offset, matchers, true) .await?; let manipulate = RangeManipulate::new( self.ctx.start, @@ -420,29 +421,34 @@ impl PromPlanner { &mut self, offset: &Option, label_matchers: Matchers, + is_range_selector: bool, ) -> Result { let table_name = self.ctx.table_name.clone().unwrap(); // make filter exprs - let offset_duration = -match offset { + let offset_duration = match offset { Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), None => 0, }; - let mut filters = self.matchers_to_expr(label_matchers)?; - filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( + let range_ms = self.ctx.range.unwrap_or_default(); + let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?; + scan_filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( ScalarValue::TimestampMillisecond( - Some(self.ctx.start - offset_duration - self.ctx.lookback_delta), + Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range_ms), None, ), ))); - filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None), + scan_filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond( + Some(self.ctx.end - offset_duration + self.ctx.lookback_delta), + None, + ), ))); // make table scan with filter exprs let mut table_scan = self - .create_table_scan_plan(&table_name, filters.clone()) + .create_table_scan_plan(&table_name, scan_filters.clone()) .await?; // make a projection plan if there is any `__field__` matcher @@ -514,9 +520,14 @@ impl PromPlanner { } // make filter and sort plan - let sort_plan = LogicalPlanBuilder::from(table_scan) - .filter(utils::conjunction(filters.into_iter()).unwrap()) - .context(DataFusionPlanningSnafu)? + let mut plan_builder = LogicalPlanBuilder::from(table_scan); + let accurate_filters = self.matchers_to_expr(label_matchers)?; + if !accurate_filters.is_empty() { + plan_builder = plan_builder + .filter(utils::conjunction(accurate_filters).unwrap()) + .context(DataFusionPlanningSnafu)?; + } + let sort_plan = plan_builder .sort(self.create_tag_and_time_index_column_sort_exprs()?) .context(DataFusionPlanningSnafu)? .build() @@ -534,6 +545,7 @@ impl PromPlanner { .time_index_column .clone() .with_context(|| TimeIndexNotFoundSnafu { table: table_name })?, + is_range_selector, divide_plan, ); let logical_plan = LogicalPlan::Extension(Extension { @@ -1270,11 +1282,11 @@ mod test { "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\ \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 != Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1474,11 +1486,11 @@ mod test { "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + \n Filter: some_metric.tag_0 != Utf8(\"bar\") [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!( plan.display_indent_schema().to_string(), @@ -1499,11 +1511,11 @@ mod test { "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + \n Filter: some_metric.tag_0 != Utf8(\"bar\") [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected_without); } @@ -1621,17 +1633,17 @@ mod test { \n Inner Join: lhs.tag_0 = some_metric.tag_0, lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1663,11 +1675,11 @@ mod test { let expected = String::from( "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await; @@ -1688,11 +1700,10 @@ mod test { let expected = String::from( "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await; @@ -1713,11 +1724,10 @@ mod test { let expected = String::from( "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await; @@ -1730,11 +1740,10 @@ mod test { "Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-301000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await; @@ -1746,11 +1755,10 @@ mod test { let expected = String::from( "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await; @@ -1763,11 +1771,10 @@ mod test { "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-301000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); indie_query_plan_compare(query, expected).await;