From c8f07637930cb535fbfc2a916a57d7a7bb51e6a8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 Mar 2023 10:55:22 +0800 Subject: [PATCH 1/7] clean up Signed-off-by: Ruihang Xia --- src/promql/src/engine/evaluator.rs | 29 ----------------------------- src/promql/src/engine/functions.rs | 15 --------------- src/promql/src/lib.rs | 1 - 3 files changed, 45 deletions(-) delete mode 100644 src/promql/src/engine/evaluator.rs delete mode 100644 src/promql/src/engine/functions.rs diff --git a/src/promql/src/engine/evaluator.rs b/src/promql/src/engine/evaluator.rs deleted file mode 100644 index 5352e6d9c51b..000000000000 --- a/src/promql/src/engine/evaluator.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use promql_parser::parser::{Expr, Value}; - -use crate::engine::Context; -use crate::error::Result; - -/// An evaluator evaluates given expressions over given fixed timestamps. -pub struct Evaluator {} - -impl Evaluator { - pub fn eval(_ctx: &Context, _expr: &Expr) -> Result> { - unimplemented!(); - } -} diff --git a/src/promql/src/engine/functions.rs b/src/promql/src/engine/functions.rs deleted file mode 100644 index 99a4a29db140..000000000000 --- a/src/promql/src/engine/functions.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! PromQL functions diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index c4b6d796b102..7436f820ec3b 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod engine; pub mod error; pub mod extension_plan; pub mod functions; From a64e949500c0c37ce84b82d44cea759278e9fd2d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 Mar 2023 17:39:50 +0800 Subject: [PATCH 2/7] fix increase fn Signed-off-by: Ruihang Xia --- src/promql/src/error.rs | 6 +- .../src/extension_plan/range_manipulate.rs | 21 +- src/promql/src/functions/increase.rs | 247 +++++++++++++++--- src/promql/src/planner.rs | 42 ++- 4 files changed, 270 insertions(+), 46 deletions(-) diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 57808f555d86..0e0d3ef53236 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -94,6 +94,9 @@ pub enum Error { #[snafu(backtrace)] source: catalog::error::Error, }, + + #[snafu(display("Expect a range selector, but not found"))] + ExpectRangeSelector { backtrace: Backtrace }, } impl ErrorExt for Error { @@ -105,7 +108,8 @@ impl ErrorExt for Error { | UnsupportedExpr { .. } | UnexpectedToken { .. } | MultipleVector { .. } - | ExpectExpr { .. } => StatusCode::InvalidArguments, + | ExpectExpr { .. } + | ExpectRangeSelector { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index f39fbcd4b49e..f775e8459d1f 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -42,6 +42,11 @@ use crate::range_array::RangeArray; /// /// This plan will "fold" time index and value columns into [RangeArray]s, and truncate /// other columns to the same length with the "folded" [RangeArray] column. +/// +/// To pass runtime information to the execution plan (or the range function), This plan +/// will add those extra columns: +/// - timestamp range with type [RangeArray], which is the folded timestamp column. +/// - end of current range with the same type as the timestamp column. (todo) #[derive(Debug, PartialEq, Eq, Hash)] pub struct RangeManipulate { start: Millisecond, @@ -79,14 +84,18 @@ impl RangeManipulate { }) } - pub fn range_timestamp_name(&self) -> String { - Self::build_timestamp_range_name(&self.time_index) - } - pub fn build_timestamp_range_name(time_index: &str) -> String { format!("{time_index}_range") } + pub fn internal_range_end_col_name() -> String { + "__internal_range_end".to_string() + } + + fn range_timestamp_name(&self) -> String { + Self::build_timestamp_range_name(&self.time_index) + } + fn calculate_output_schema( input_schema: &DFSchemaRef, time_index: &str, @@ -96,10 +105,10 @@ impl RangeManipulate { // process time index column // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last. - let Some(index) = input_schema.index_of_column_by_name(None, time_index)? else { + let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else { return Err(datafusion::common::field_not_found(None, time_index, input_schema.as_ref())) }; - let timestamp_range_field = columns[index] + let timestamp_range_field = columns[ts_col_index] .field() .clone() .with_name(Self::build_timestamp_range_name(time_index)); diff --git a/src/promql/src/functions/increase.rs b/src/promql/src/functions/increase.rs index c0c087584e1b..0185afc78169 100644 --- a/src/promql/src/functions/increase.rs +++ b/src/promql/src/functions/increase.rs @@ -12,62 +12,106 @@ // See the License for the specific language governing permissions and // limitations under the License. +// This file also contains some code from prometheus project. +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::fmt::Display; use std::sync::Arc; -use datafusion::arrow::array::Float64Array; +use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; +use crate::extension_plan::Millisecond; use crate::functions::extract_array; use crate::range_array::RangeArray; /// Part of the `extrapolatedRate` in Promql, /// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66 #[derive(Debug)] -pub struct Increase {} +pub struct Increase { + range_length: i64, +} impl Increase { + /// Constructor. Other public usage should use [`scalar_udf`] instead. + fn new(range_length: i64) -> Self { + Self { range_length } + } + pub fn name() -> &'static str { "prom_increase" } - fn input_type() -> DataType { - RangeArray::convert_data_type(DataType::Float64) + fn input_type() -> Vec { + vec![ + // timestamp range vector + RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + // value range vector + RangeArray::convert_data_type(DataType::Float64), + // timestamp vector + DataType::Timestamp(TimeUnit::Millisecond, None), + ] } fn return_type() -> DataType { DataType::Float64 } - fn calc(input: &[ColumnarValue]) -> Result { + fn calc(&self, input: &[ColumnarValue]) -> Result { + assert_eq!(input.len(), 3); + // construct matrix from input - assert_eq!(input.len(), 1); - let input_array = extract_array(input.first().unwrap())?; - let array_data = input_array.data().clone(); - let range_array: RangeArray = RangeArray::try_new(array_data.into())?; + let ts_array = extract_array(&input[0])?; + let ts_range = RangeArray::try_new(ts_array.data().clone().into())?; + let value_array = extract_array(&input[1])?; + let value_range = RangeArray::try_new(value_array.data().clone().into())?; + let ts = extract_array(&input[2])?; + let ts = ts + .as_any() + .downcast_ref::() + .unwrap(); // calculation - let mut result_array = Vec::with_capacity(range_array.len()); - for index in 0..range_array.len() { - let range = range_array.get(index).unwrap(); - let range = range + let mut result_array = Vec::with_capacity(ts_range.len()); + for index in 0..ts_range.len() { + let timestamps = ts_range.get(index).unwrap(); + let timestamps = timestamps + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let end_ts = ts.value(index); + let values = value_range.get(index).unwrap(); + let values = values .as_any() .downcast_ref::() .unwrap() .values(); - if range.len() < 2 { + if values.len() < 2 { result_array.push(None); continue; } // refer to functions.go L83-L110 - let mut result_value = range.last().unwrap() - range.first().unwrap(); - for window in range.windows(2) { + let mut result_value = values.last().unwrap() - values.first().unwrap(); + for window in values.windows(2) { let prev = window[0]; let curr = window[1]; if curr < prev { @@ -75,22 +119,85 @@ impl Increase { } } - result_array.push(Some(result_value)); + let factor = Self::extrapolate_factor( + timestamps, + end_ts, + self.range_length, + Some(*values.first().unwrap()), + Some(result_value), + ); + + result_array.push(Some(result_value * factor)); } let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); Ok(result) } - pub fn scalar_udf() -> ScalarUDF { + fn extrapolate_factor( + timestamps: &[Millisecond], + range_end: Millisecond, + range_length: Millisecond, + // the following two parameters are for counters. + // see functions.go L121 - L127 + first_value: Option, + result_value: Option, + ) -> f64 { + // result_value + // refer to functions.go extrapolatedRate fn + // assume offset is processed (and it should be processed in normalize plan) + let range_start = range_end - range_length; + let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0; + let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0; + let sampled_interval = + (timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0; + let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64; + + // functions.go L122 - L134. quote: + // Counters cannot be negative. If we have any slope at + // all (i.e. resultValue went up), we can extrapolate + // the zero point of the counter. If the duration to the + // zero point is shorter than the durationToStart, we + // take the zero point as the start of the series, + // thereby avoiding extrapolation to negative counter + // values. + if let Some(first_value) = first_value { + if let Some(result_value) = result_value { + if result_value > 0.0 && first_value > 0.0 { + let duration_to_zero = sampled_interval * (first_value / result_value); + if duration_to_zero < duration_to_start { + duration_to_start = duration_to_zero; + } + } + } + } + + let extrapolation_threshold = average_duration_between_samples * 1.1; + let mut extrapolate_to_interval = sampled_interval; + + if duration_to_start < extrapolation_threshold { + extrapolate_to_interval += duration_to_start; + } else { + extrapolate_to_interval += average_duration_between_samples / 2.0; + } + if duration_to_end < extrapolation_threshold { + extrapolate_to_interval += duration_to_end; + } else { + extrapolate_to_interval += average_duration_between_samples / 2.0; + } + + extrapolate_to_interval / sampled_interval + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { ScalarUDF { name: Self::name().to_string(), signature: Signature::new( - TypeSignature::Exact(vec![Self::input_type()]), + TypeSignature::Exact(Self::input_type()), Volatility::Immutable, ), return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), - fun: Arc::new(Self::calc), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), } } } @@ -104,11 +211,23 @@ impl Display for Increase { #[cfg(test)] mod test { + use datafusion::arrow::array::ArrayRef; + use super::*; - fn increase_runner(input: RangeArray, expected: Vec) { - let input = vec![ColumnarValue::Array(Arc::new(input.into_dict()))]; - let output = extract_array(&Increase::calc(&input).unwrap()) + /// Range length is fixed to 5 + fn increase_runner( + ts_range: RangeArray, + value_range: RangeArray, + timestamps: ArrayRef, + expected: Vec, + ) { + let input = vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(timestamps), + ]; + let output = extract_array(&Increase::new(5).calc(&input).unwrap()) .unwrap() .as_any() .downcast_ref::() @@ -120,16 +239,36 @@ mod test { #[test] fn abnormal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); let values_array = Arc::new(Float64Array::from_iter([ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, ])); let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 4.0, 0.0, 2.0, 0.0, 0.0]); + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter([ + Some(2), + Some(5), + Some(2), + Some(6), + Some(9), + None, + ])) as _; + increase_runner( + ts_range, + value_range, + timestamps, + vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0], + ); } #[test] fn normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); let values_array = Arc::new(Float64Array::from_iter([ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, ])); @@ -143,12 +282,25 @@ mod test { (6, 2), (7, 2), ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + increase_runner( + ts_range, + value_range, + timestamps, + // `2.0` is because that `duration_to_zero` less than `extrapolation_threshold` + vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5], + ); } #[test] fn short_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); let values_array = Arc::new(Float64Array::from_iter([ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, ])); @@ -162,13 +314,32 @@ mod test { (6, 0), (7, 2), ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]); + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter([ + Some(1), + None, + Some(3), + None, + Some(7), + Some(6), + None, + Some(9), + ])) as _; + increase_runner( + ts_range, + value_range, + timestamps, + vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5], + ); } #[test] fn counter_reset() { - // this series should be treated [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] let values_array = Arc::new(Float64Array::from_iter([ 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, ])); @@ -182,7 +353,19 @@ mod test { (6, 2), (7, 2), ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + increase_runner( + ts_range, + value_range, + timestamps, + // that two `2.0` is because `duration_to_start` are shrinked to to + // `duration_to_zero`, and causes `duration_to_zero` less than + // `extrapolation_threshold`. + vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5], + ); } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 92396f0be6e0..88c5fa4ccfbb 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -41,9 +41,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::error::{ - CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, - TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, - UnsupportedExprSnafu, ValueNotFoundSnafu, + CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu, + MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, + UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu, }; use crate::extension_plan::{ EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, @@ -74,6 +74,8 @@ struct PromPlannerContext { time_index_column: Option, value_columns: Vec, tag_columns: Vec, + /// The range in millisecond of range selector. None if there is no range selector. + range: Option, } impl PromPlannerContext { @@ -317,6 +319,8 @@ impl PromPlanner { } = vector_selector; let matchers = self.preprocess_label_matchers(matchers)?; self.setup_context().await?; + let range_ms = range.as_millis() as _; + self.ctx.range = Some(range_ms); let normalize = self .selector_to_series_normalize_plan(offset, matchers) .await?; @@ -325,7 +329,7 @@ impl PromPlanner { self.ctx.end, self.ctx.interval, // TODO(ruihang): convert via Timestamp datatypes to support different time units - range.as_millis() as _, + range_ms, self.ctx .time_index_column .clone() @@ -668,7 +672,9 @@ impl PromPlanner { // TODO(ruihang): set this according to in-param list let value_column_pos = 0; let scalar_func = match func.name { - "increase" => ScalarFunc::Udf(Increase::scalar_udf()), + "increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), "idelta" => ScalarFunc::Udf(IDelta::::scalar_udf()), "irate" => ScalarFunc::Udf(IDelta::::scalar_udf()), "avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()), @@ -720,6 +726,25 @@ impl PromPlanner { other_input_exprs.remove(value_column_pos + 1); other_input_exprs.remove(value_column_pos); } + ScalarFunc::ExtrapolateUdf(fun) => { + let ts_range_expr = DfExpr::Column(Column::from_name( + RangeManipulate::build_timestamp_range_name( + self.ctx.time_index_column.as_ref().unwrap(), + ), + )); + other_input_exprs.insert(value_column_pos, ts_range_expr); + other_input_exprs.insert(value_column_pos + 1, col_expr); + other_input_exprs + .insert(value_column_pos + 2, self.create_time_index_column_expr()?); + let fn_expr = DfExpr::ScalarUDF { + fun: Arc::new(fun), + args: other_input_exprs.clone(), + }; + exprs.push(fn_expr); + other_input_exprs.remove(value_column_pos + 2); + other_input_exprs.remove(value_column_pos + 1); + other_input_exprs.remove(value_column_pos); + } } } @@ -1032,6 +1057,9 @@ struct FunctionArgs { enum ScalarFunc { DataFusionBuiltin(BuiltinScalarFunction), Udf(ScalarUDF), + // todo(ruihang): maybe merge with Udf later + /// UDF that require extra information like range length to be evaluated. + ExtrapolateUdf(ScalarUDF), } #[cfg(test)] @@ -1600,8 +1628,8 @@ mod test { async fn increase_aggr() { let query = "increase(some_metric[5m])"; let expected = String::from( - "Filter: prom_increase(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ - \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0) AS prom_increase(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + "Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ From eca22d30e2e29e2051b064a9508a4fe09d094368 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 Mar 2023 18:08:11 +0800 Subject: [PATCH 3/7] impl rate and delta Signed-off-by: Ruihang Xia --- src/promql/src/functions.rs | 4 +- .../{increase.rs => extrapolate_rate.rs} | 112 ++++++++++++++---- src/promql/src/planner.rs | 10 +- 3 files changed, 97 insertions(+), 29 deletions(-) rename src/promql/src/functions/{increase.rs => extrapolate_rate.rs} (81%) diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index 86e9c3db83c9..ceba5c6181c7 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -15,8 +15,8 @@ mod aggr_over_time; mod changes; mod deriv; +mod extrapolate_rate; mod idelta; -mod increase; mod resets; #[cfg(test)] mod test_util; @@ -28,8 +28,8 @@ pub use aggr_over_time::{ use datafusion::arrow::array::ArrayRef; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; +pub use extrapolate_rate::{Delta, Increase, Rate}; pub use idelta::IDelta; -pub use increase::Increase; pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { if let ColumnarValue::Array(array) = columnar_value { diff --git a/src/promql/src/functions/increase.rs b/src/promql/src/functions/extrapolate_rate.rs similarity index 81% rename from src/promql/src/functions/increase.rs rename to src/promql/src/functions/extrapolate_rate.rs index 0185afc78169..5a16dbbdc79a 100644 --- a/src/promql/src/functions/increase.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -26,6 +26,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Implementations of `rate`, `increase` and `delta` functions in PromQL. + use std::fmt::Display; use std::sync::Arc; @@ -41,23 +43,24 @@ use crate::extension_plan::Millisecond; use crate::functions::extract_array; use crate::range_array::RangeArray; +pub type Delta = ExtrapolatedRate; +pub type Rate = ExtrapolatedRate; +pub type Increase = ExtrapolatedRate; + /// Part of the `extrapolatedRate` in Promql, /// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66 #[derive(Debug)] -pub struct Increase { +pub struct ExtrapolatedRate { + /// Range duration in millisecond range_length: i64, } -impl Increase { +impl ExtrapolatedRate { /// Constructor. Other public usage should use [`scalar_udf`] instead. fn new(range_length: i64) -> Self { Self { range_length } } - pub fn name() -> &'static str { - "prom_increase" - } - fn input_type() -> Vec { vec![ // timestamp range vector @@ -111,22 +114,28 @@ impl Increase { // refer to functions.go L83-L110 let mut result_value = values.last().unwrap() - values.first().unwrap(); - for window in values.windows(2) { - let prev = window[0]; - let curr = window[1]; - if curr < prev { - result_value += prev + if !IS_COUNTER { + for window in values.windows(2) { + let prev = window[0]; + let curr = window[1]; + if curr < prev { + result_value += prev + } } } - let factor = Self::extrapolate_factor( + let mut factor = Self::extrapolate_factor( timestamps, end_ts, self.range_length, - Some(*values.first().unwrap()), - Some(result_value), + *values.first().unwrap(), + result_value, ); + if IS_RATE { + factor /= self.range_length as f64 / 1000.0; + } + result_array.push(Some(result_value * factor)); } @@ -140,8 +149,8 @@ impl Increase { range_length: Millisecond, // the following two parameters are for counters. // see functions.go L121 - L127 - first_value: Option, - result_value: Option, + first_value: f64, + result_value: f64, ) -> f64 { // result_value // refer to functions.go extrapolatedRate fn @@ -161,14 +170,10 @@ impl Increase { // take the zero point as the start of the series, // thereby avoiding extrapolation to negative counter // values. - if let Some(first_value) = first_value { - if let Some(result_value) = result_value { - if result_value > 0.0 && first_value > 0.0 { - let duration_to_zero = sampled_interval * (first_value / result_value); - if duration_to_zero < duration_to_start { - duration_to_start = duration_to_zero; - } - } + if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 { + let duration_to_zero = sampled_interval * (first_value / result_value); + if duration_to_zero < duration_to_start { + duration_to_start = duration_to_zero; } } @@ -188,6 +193,51 @@ impl Increase { extrapolate_to_interval / sampled_interval } +} + +// delta +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_delta" + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), + } + } +} + +// rate +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_rate" + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), + } + } +} + +// increase +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_increase" + } pub fn scalar_udf(range_length: i64) -> ScalarUDF { ScalarUDF { @@ -202,7 +252,19 @@ impl Increase { } } -impl Display for Increase { +impl Display for ExtrapolatedRate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Delta Function") + } +} + +impl Display for ExtrapolatedRate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Rate Function") + } +} + +impl Display for ExtrapolatedRate { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("PromQL Increase Function") } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 88c5fa4ccfbb..0d6b4f0a798a 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -49,8 +49,8 @@ use crate::extension_plan::{ EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{ - AbsentOverTime, AvgOverTime, CountOverTime, IDelta, Increase, LastOverTime, MaxOverTime, - MinOverTime, PresentOverTime, SumOverTime, + AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime, + MinOverTime, PresentOverTime, Rate, SumOverTime, }; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -675,6 +675,12 @@ impl PromPlanner { "increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf( self.ctx.range.context(ExpectRangeSelectorSnafu)?, )), + "rate" => ScalarFunc::ExtrapolateUdf(Rate::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), + "delta" => ScalarFunc::ExtrapolateUdf(Delta::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), "idelta" => ScalarFunc::Udf(IDelta::::scalar_udf()), "irate" => ScalarFunc::Udf(IDelta::::scalar_udf()), "avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()), From 56bd1dff891e74ba162b55506c5473105d33ac49 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 Mar 2023 18:14:08 +0800 Subject: [PATCH 4/7] fix typo Signed-off-by: Ruihang Xia --- src/promql/src/functions/extrapolate_rate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 5a16dbbdc79a..82a14557551c 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -424,7 +424,7 @@ mod test { ts_range, value_range, timestamps, - // that two `2.0` is because `duration_to_start` are shrinked to to + // that two `2.0` is because `duration_to_start` are shrunk to to // `duration_to_zero`, and causes `duration_to_zero` less than // `extrapolation_threshold`. vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5], From 1dbd7814c7f35a11a79cfda38cf8502ffb2b8c16 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 Mar 2023 19:15:35 +0800 Subject: [PATCH 5/7] fix IS_RATE condition Signed-off-by: Ruihang Xia --- src/promql/src/functions/extrapolate_rate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 82a14557551c..56b545b6fa46 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -114,7 +114,7 @@ impl ExtrapolatedRate Date: Mon, 27 Mar 2023 19:25:13 +0800 Subject: [PATCH 6/7] more tests about rate and delta Signed-off-by: Ruihang Xia --- src/promql/src/functions/extrapolate_rate.rs | 163 +++++++++++++++++-- 1 file changed, 147 insertions(+), 16 deletions(-) diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 56b545b6fa46..148d42c1440f 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -278,7 +278,7 @@ mod test { use super::*; /// Range length is fixed to 5 - fn increase_runner( + fn extrapolated_rate_runner( ts_range: RangeArray, value_range: RangeArray, timestamps: ArrayRef, @@ -289,18 +289,22 @@ mod test { ColumnarValue::Array(Arc::new(value_range.into_dict())), ColumnarValue::Array(timestamps), ]; - let output = extract_array(&Increase::new(5).calc(&input).unwrap()) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .values() - .to_vec(); + let output = extract_array( + &ExtrapolatedRate::::new(5) + .calc(&input) + .unwrap(), + ) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); assert_eq!(output, expected); } #[test] - fn abnormal_input() { + fn increase_abnormal_input() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )); @@ -318,7 +322,7 @@ mod test { Some(9), None, ])) as _; - increase_runner( + extrapolated_rate_runner::( ts_range, value_range, timestamps, @@ -327,7 +331,7 @@ mod test { } #[test] - fn normal_input() { + fn increase_normal_input() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )); @@ -349,7 +353,7 @@ mod test { let timestamps = Arc::new(TimestampMillisecondArray::from_iter( [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )) as _; - increase_runner( + extrapolated_rate_runner::( ts_range, value_range, timestamps, @@ -359,7 +363,7 @@ mod test { } #[test] - fn short_input() { + fn increase_short_input() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )); @@ -388,7 +392,7 @@ mod test { None, Some(9), ])) as _; - increase_runner( + extrapolated_rate_runner::( ts_range, value_range, timestamps, @@ -397,7 +401,7 @@ mod test { } #[test] - fn counter_reset() { + fn increase_counter_reset() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )); @@ -420,7 +424,7 @@ mod test { let timestamps = Arc::new(TimestampMillisecondArray::from_iter( [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), )) as _; - increase_runner( + extrapolated_rate_runner::( ts_range, value_range, timestamps, @@ -430,4 +434,131 @@ mod test { vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5], ); } + + #[test] + fn rate_counter_reset() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0], + ); + } + + #[test] + fn rate_normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0], + ); + } + + #[test] + fn delta_counter_reset() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + // delta doesn't handle counter reset, thus there is a negative value + vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5], + ); + } + + #[test] + fn delta_normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5], + ); + } } From 726eae4128a5418d770630585d99351ebdebf3cd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 28 Mar 2023 14:47:06 +0800 Subject: [PATCH 7/7] ensure range_length is not zero Signed-off-by: Ruihang Xia --- src/promql/src/error.rs | 6 +++++- src/promql/src/functions/extrapolate_rate.rs | 1 + src/promql/src/planner.rs | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 0e0d3ef53236..185764e26d3e 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -97,6 +97,9 @@ pub enum Error { #[snafu(display("Expect a range selector, but not found"))] ExpectRangeSelector { backtrace: Backtrace }, + + #[snafu(display("Zero range in range selector"))] + ZeroRangeSelector { backtrace: Backtrace }, } impl ErrorExt for Error { @@ -109,7 +112,8 @@ impl ErrorExt for Error { | UnexpectedToken { .. } | MultipleVector { .. } | ExpectExpr { .. } - | ExpectRangeSelector { .. } => StatusCode::InvalidArguments, + | ExpectRangeSelector { .. } + | ZeroRangeSelector { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 148d42c1440f..b41248dfc700 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -133,6 +133,7 @@ impl ExtrapolatedRate