diff --git a/src/promql/src/engine/evaluator.rs b/src/promql/src/engine/evaluator.rs deleted file mode 100644 index 5352e6d9c51b..000000000000 --- a/src/promql/src/engine/evaluator.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use promql_parser::parser::{Expr, Value}; - -use crate::engine::Context; -use crate::error::Result; - -/// An evaluator evaluates given expressions over given fixed timestamps. -pub struct Evaluator {} - -impl Evaluator { - pub fn eval(_ctx: &Context, _expr: &Expr) -> Result> { - unimplemented!(); - } -} diff --git a/src/promql/src/engine/functions.rs b/src/promql/src/engine/functions.rs deleted file mode 100644 index 99a4a29db140..000000000000 --- a/src/promql/src/engine/functions.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! PromQL functions diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 57808f555d86..185764e26d3e 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -94,6 +94,12 @@ pub enum Error { #[snafu(backtrace)] source: catalog::error::Error, }, + + #[snafu(display("Expect a range selector, but not found"))] + ExpectRangeSelector { backtrace: Backtrace }, + + #[snafu(display("Zero range in range selector"))] + ZeroRangeSelector { backtrace: Backtrace }, } impl ErrorExt for Error { @@ -105,7 +111,9 @@ impl ErrorExt for Error { | UnsupportedExpr { .. } | UnexpectedToken { .. } | MultipleVector { .. } - | ExpectExpr { .. } => StatusCode::InvalidArguments, + | ExpectExpr { .. } + | ExpectRangeSelector { .. } + | ZeroRangeSelector { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index f39fbcd4b49e..f775e8459d1f 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -42,6 +42,11 @@ use crate::range_array::RangeArray; /// /// This plan will "fold" time index and value columns into [RangeArray]s, and truncate /// other columns to the same length with the "folded" [RangeArray] column. +/// +/// To pass runtime information to the execution plan (or the range function), This plan +/// will add those extra columns: +/// - timestamp range with type [RangeArray], which is the folded timestamp column. +/// - end of current range with the same type as the timestamp column. (todo) #[derive(Debug, PartialEq, Eq, Hash)] pub struct RangeManipulate { start: Millisecond, @@ -79,14 +84,18 @@ impl RangeManipulate { }) } - pub fn range_timestamp_name(&self) -> String { - Self::build_timestamp_range_name(&self.time_index) - } - pub fn build_timestamp_range_name(time_index: &str) -> String { format!("{time_index}_range") } + pub fn internal_range_end_col_name() -> String { + "__internal_range_end".to_string() + } + + fn range_timestamp_name(&self) -> String { + Self::build_timestamp_range_name(&self.time_index) + } + fn calculate_output_schema( input_schema: &DFSchemaRef, time_index: &str, @@ -96,10 +105,10 @@ impl RangeManipulate { // process time index column // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last. - let Some(index) = input_schema.index_of_column_by_name(None, time_index)? else { + let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else { return Err(datafusion::common::field_not_found(None, time_index, input_schema.as_ref())) }; - let timestamp_range_field = columns[index] + let timestamp_range_field = columns[ts_col_index] .field() .clone() .with_name(Self::build_timestamp_range_name(time_index)); diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index 86e9c3db83c9..ceba5c6181c7 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -15,8 +15,8 @@ mod aggr_over_time; mod changes; mod deriv; +mod extrapolate_rate; mod idelta; -mod increase; mod resets; #[cfg(test)] mod test_util; @@ -28,8 +28,8 @@ pub use aggr_over_time::{ use datafusion::arrow::array::ArrayRef; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; +pub use extrapolate_rate::{Delta, Increase, Rate}; pub use idelta::IDelta; -pub use increase::Increase; pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { if let ColumnarValue::Array(array) = columnar_value { diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs new file mode 100644 index 000000000000..b41248dfc700 --- /dev/null +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -0,0 +1,565 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file also contains some code from prometheus project. +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementations of `rate`, `increase` and `delta` functions in PromQL. + +use std::fmt::Display; +use std::sync::Arc; + +use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::DataType; + +use crate::extension_plan::Millisecond; +use crate::functions::extract_array; +use crate::range_array::RangeArray; + +pub type Delta = ExtrapolatedRate; +pub type Rate = ExtrapolatedRate; +pub type Increase = ExtrapolatedRate; + +/// Part of the `extrapolatedRate` in Promql, +/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66 +#[derive(Debug)] +pub struct ExtrapolatedRate { + /// Range duration in millisecond + range_length: i64, +} + +impl ExtrapolatedRate { + /// Constructor. Other public usage should use [`scalar_udf`] instead. + fn new(range_length: i64) -> Self { + Self { range_length } + } + + fn input_type() -> Vec { + vec![ + // timestamp range vector + RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + // value range vector + RangeArray::convert_data_type(DataType::Float64), + // timestamp vector + DataType::Timestamp(TimeUnit::Millisecond, None), + ] + } + + fn return_type() -> DataType { + DataType::Float64 + } + + fn calc(&self, input: &[ColumnarValue]) -> Result { + assert_eq!(input.len(), 3); + + // construct matrix from input + let ts_array = extract_array(&input[0])?; + let ts_range = RangeArray::try_new(ts_array.data().clone().into())?; + let value_array = extract_array(&input[1])?; + let value_range = RangeArray::try_new(value_array.data().clone().into())?; + let ts = extract_array(&input[2])?; + let ts = ts + .as_any() + .downcast_ref::() + .unwrap(); + + // calculation + let mut result_array = Vec::with_capacity(ts_range.len()); + for index in 0..ts_range.len() { + let timestamps = ts_range.get(index).unwrap(); + let timestamps = timestamps + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let end_ts = ts.value(index); + let values = value_range.get(index).unwrap(); + let values = values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + if values.len() < 2 { + result_array.push(None); + continue; + } + + // refer to functions.go L83-L110 + let mut result_value = values.last().unwrap() - values.first().unwrap(); + if IS_COUNTER { + for window in values.windows(2) { + let prev = window[0]; + let curr = window[1]; + if curr < prev { + result_value += prev + } + } + } + + let mut factor = Self::extrapolate_factor( + timestamps, + end_ts, + self.range_length, + *values.first().unwrap(), + result_value, + ); + + if IS_RATE { + // safety: range_length is checked to be non-zero in the planner. + factor /= self.range_length as f64 / 1000.0; + } + + result_array.push(Some(result_value * factor)); + } + + let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + Ok(result) + } + + fn extrapolate_factor( + timestamps: &[Millisecond], + range_end: Millisecond, + range_length: Millisecond, + // the following two parameters are for counters. + // see functions.go L121 - L127 + first_value: f64, + result_value: f64, + ) -> f64 { + // result_value + // refer to functions.go extrapolatedRate fn + // assume offset is processed (and it should be processed in normalize plan) + let range_start = range_end - range_length; + let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0; + let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0; + let sampled_interval = + (timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0; + let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64; + + // functions.go L122 - L134. quote: + // Counters cannot be negative. If we have any slope at + // all (i.e. resultValue went up), we can extrapolate + // the zero point of the counter. If the duration to the + // zero point is shorter than the durationToStart, we + // take the zero point as the start of the series, + // thereby avoiding extrapolation to negative counter + // values. + if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 { + let duration_to_zero = sampled_interval * (first_value / result_value); + if duration_to_zero < duration_to_start { + duration_to_start = duration_to_zero; + } + } + + let extrapolation_threshold = average_duration_between_samples * 1.1; + let mut extrapolate_to_interval = sampled_interval; + + if duration_to_start < extrapolation_threshold { + extrapolate_to_interval += duration_to_start; + } else { + extrapolate_to_interval += average_duration_between_samples / 2.0; + } + if duration_to_end < extrapolation_threshold { + extrapolate_to_interval += duration_to_end; + } else { + extrapolate_to_interval += average_duration_between_samples / 2.0; + } + + extrapolate_to_interval / sampled_interval + } +} + +// delta +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_delta" + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), + } + } +} + +// rate +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_rate" + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), + } + } +} + +// increase +impl ExtrapolatedRate { + pub fn name() -> &'static str { + "prom_increase" + } + + pub fn scalar_udf(range_length: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(range_length).calc(input)), + } + } +} + +impl Display for ExtrapolatedRate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Delta Function") + } +} + +impl Display for ExtrapolatedRate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Rate Function") + } +} + +impl Display for ExtrapolatedRate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Increase Function") + } +} + +#[cfg(test)] +mod test { + + use datafusion::arrow::array::ArrayRef; + + use super::*; + + /// Range length is fixed to 5 + fn extrapolated_rate_runner( + ts_range: RangeArray, + value_range: RangeArray, + timestamps: ArrayRef, + expected: Vec, + ) { + let input = vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(timestamps), + ]; + let output = extract_array( + &ExtrapolatedRate::::new(5) + .calc(&input) + .unwrap(), + ) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert_eq!(output, expected); + } + + #[test] + fn increase_abnormal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter([ + Some(2), + Some(5), + Some(2), + Some(6), + Some(9), + None, + ])) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0], + ); + } + + #[test] + fn increase_normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + // `2.0` is because that `duration_to_zero` less than `extrapolation_threshold` + vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5], + ); + } + + #[test] + fn increase_short_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 1), + (1, 0), + (2, 1), + (3, 0), + (4, 3), + (5, 1), + (6, 0), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter([ + Some(1), + None, + Some(3), + None, + Some(7), + Some(6), + None, + Some(9), + ])) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5], + ); + } + + #[test] + fn increase_counter_reset() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + // that two `2.0` is because `duration_to_start` are shrunk to to + // `duration_to_zero`, and causes `duration_to_zero` less than + // `extrapolation_threshold`. + vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5], + ); + } + + #[test] + fn rate_counter_reset() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0], + ); + } + + #[test] + fn rate_normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0], + ); + } + + #[test] + fn delta_counter_reset() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + // delta doesn't handle counter reset, thus there is a negative value + vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5], + ); + } + + #[test] + fn delta_normal_input() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5], + ); + } +} diff --git a/src/promql/src/functions/increase.rs b/src/promql/src/functions/increase.rs deleted file mode 100644 index c0c087584e1b..000000000000 --- a/src/promql/src/functions/increase.rs +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::Display; -use std::sync::Arc; - -use datafusion::arrow::array::Float64Array; -use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; -use datafusion::physical_plan::ColumnarValue; -use datatypes::arrow::array::Array; -use datatypes::arrow::datatypes::DataType; - -use crate::functions::extract_array; -use crate::range_array::RangeArray; - -/// Part of the `extrapolatedRate` in Promql, -/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66 -#[derive(Debug)] -pub struct Increase {} - -impl Increase { - pub fn name() -> &'static str { - "prom_increase" - } - - fn input_type() -> DataType { - RangeArray::convert_data_type(DataType::Float64) - } - - fn return_type() -> DataType { - DataType::Float64 - } - - fn calc(input: &[ColumnarValue]) -> Result { - // construct matrix from input - assert_eq!(input.len(), 1); - let input_array = extract_array(input.first().unwrap())?; - let array_data = input_array.data().clone(); - let range_array: RangeArray = RangeArray::try_new(array_data.into())?; - - // calculation - let mut result_array = Vec::with_capacity(range_array.len()); - for index in 0..range_array.len() { - let range = range_array.get(index).unwrap(); - let range = range - .as_any() - .downcast_ref::() - .unwrap() - .values(); - - if range.len() < 2 { - result_array.push(None); - continue; - } - - // refer to functions.go L83-L110 - let mut result_value = range.last().unwrap() - range.first().unwrap(); - for window in range.windows(2) { - let prev = window[0]; - let curr = window[1]; - if curr < prev { - result_value += prev - } - } - - result_array.push(Some(result_value)); - } - - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); - Ok(result) - } - - pub fn scalar_udf() -> ScalarUDF { - ScalarUDF { - name: Self::name().to_string(), - signature: Signature::new( - TypeSignature::Exact(vec![Self::input_type()]), - Volatility::Immutable, - ), - return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), - fun: Arc::new(Self::calc), - } - } -} - -impl Display for Increase { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("PromQL Increase Function") - } -} - -#[cfg(test)] -mod test { - - use super::*; - - fn increase_runner(input: RangeArray, expected: Vec) { - let input = vec![ColumnarValue::Array(Arc::new(input.into_dict()))]; - let output = extract_array(&Increase::calc(&input).unwrap()) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .values() - .to_vec(); - assert_eq!(output, expected); - } - - #[test] - fn abnormal_input() { - let values_array = Arc::new(Float64Array::from_iter([ - 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, - ])); - let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 4.0, 0.0, 2.0, 0.0, 0.0]); - } - - #[test] - fn normal_input() { - let values_array = Arc::new(Float64Array::from_iter([ - 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, - ])); - let ranges = [ - (0, 2), - (1, 2), - (2, 2), - (3, 2), - (4, 2), - (5, 2), - (6, 2), - (7, 2), - ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); - } - - #[test] - fn short_input() { - let values_array = Arc::new(Float64Array::from_iter([ - 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, - ])); - let ranges = [ - (0, 1), - (1, 0), - (2, 1), - (3, 0), - (4, 3), - (5, 1), - (6, 0), - (7, 2), - ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]); - } - - #[test] - fn counter_reset() { - // this series should be treated [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] - let values_array = Arc::new(Float64Array::from_iter([ - 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, - ])); - let ranges = [ - (0, 2), - (1, 2), - (2, 2), - (3, 2), - (4, 2), - (5, 2), - (6, 2), - (7, 2), - ]; - let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); - increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); - } -} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index c4b6d796b102..7436f820ec3b 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod engine; pub mod error; pub mod extension_plan; pub mod functions; diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 92396f0be6e0..5a19eb28d082 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -41,16 +41,17 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::error::{ - CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, - TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, - UnsupportedExprSnafu, ValueNotFoundSnafu, + CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu, + MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, + UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu, + ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{ - AbsentOverTime, AvgOverTime, CountOverTime, IDelta, Increase, LastOverTime, MaxOverTime, - MinOverTime, PresentOverTime, SumOverTime, + AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime, + MinOverTime, PresentOverTime, Rate, SumOverTime, }; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -74,6 +75,8 @@ struct PromPlannerContext { time_index_column: Option, value_columns: Vec, tag_columns: Vec, + /// The range in millisecond of range selector. None if there is no range selector. + range: Option, } impl PromPlannerContext { @@ -317,6 +320,11 @@ impl PromPlanner { } = vector_selector; let matchers = self.preprocess_label_matchers(matchers)?; self.setup_context().await?; + + ensure!(!range.is_zero(), ZeroRangeSelectorSnafu); + let range_ms = range.as_millis() as _; + self.ctx.range = Some(range_ms); + let normalize = self .selector_to_series_normalize_plan(offset, matchers) .await?; @@ -325,7 +333,7 @@ impl PromPlanner { self.ctx.end, self.ctx.interval, // TODO(ruihang): convert via Timestamp datatypes to support different time units - range.as_millis() as _, + range_ms, self.ctx .time_index_column .clone() @@ -668,7 +676,15 @@ impl PromPlanner { // TODO(ruihang): set this according to in-param list let value_column_pos = 0; let scalar_func = match func.name { - "increase" => ScalarFunc::Udf(Increase::scalar_udf()), + "increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), + "rate" => ScalarFunc::ExtrapolateUdf(Rate::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), + "delta" => ScalarFunc::ExtrapolateUdf(Delta::scalar_udf( + self.ctx.range.context(ExpectRangeSelectorSnafu)?, + )), "idelta" => ScalarFunc::Udf(IDelta::::scalar_udf()), "irate" => ScalarFunc::Udf(IDelta::::scalar_udf()), "avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()), @@ -720,6 +736,25 @@ impl PromPlanner { other_input_exprs.remove(value_column_pos + 1); other_input_exprs.remove(value_column_pos); } + ScalarFunc::ExtrapolateUdf(fun) => { + let ts_range_expr = DfExpr::Column(Column::from_name( + RangeManipulate::build_timestamp_range_name( + self.ctx.time_index_column.as_ref().unwrap(), + ), + )); + other_input_exprs.insert(value_column_pos, ts_range_expr); + other_input_exprs.insert(value_column_pos + 1, col_expr); + other_input_exprs + .insert(value_column_pos + 2, self.create_time_index_column_expr()?); + let fn_expr = DfExpr::ScalarUDF { + fun: Arc::new(fun), + args: other_input_exprs.clone(), + }; + exprs.push(fn_expr); + other_input_exprs.remove(value_column_pos + 2); + other_input_exprs.remove(value_column_pos + 1); + other_input_exprs.remove(value_column_pos); + } } } @@ -1032,6 +1067,9 @@ struct FunctionArgs { enum ScalarFunc { DataFusionBuiltin(BuiltinScalarFunction), Udf(ScalarUDF), + // todo(ruihang): maybe merge with Udf later + /// UDF that require extra information like range length to be evaluated. + ExtrapolateUdf(ScalarUDF), } #[cfg(test)] @@ -1600,8 +1638,8 @@ mod test { async fn increase_aggr() { let query = "increase(some_metric[5m])"; let expected = String::from( - "Filter: prom_increase(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ - \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0) AS prom_increase(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + "Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\