Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implelemt rate, increase and delta in PromQL #1258

Merged
merged 7 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions src/promql/src/engine/evaluator.rs

This file was deleted.

15 changes: 0 additions & 15 deletions src/promql/src/engine/functions.rs

This file was deleted.

10 changes: 9 additions & 1 deletion src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ pub enum Error {
#[snafu(backtrace)]
source: catalog::error::Error,
},

#[snafu(display("Expect a range selector, but not found"))]
ExpectRangeSelector { backtrace: Backtrace },

#[snafu(display("Zero range in range selector"))]
ZeroRangeSelector { backtrace: Backtrace },
}

impl ErrorExt for Error {
Expand All @@ -105,7 +111,9 @@ impl ErrorExt for Error {
| UnsupportedExpr { .. }
| UnexpectedToken { .. }
| MultipleVector { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,
| ExpectExpr { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. } => StatusCode::InvalidArguments,

UnknownTable { .. }
| DataFusionPlanning { .. }
Expand Down
21 changes: 15 additions & 6 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ use crate::range_array::RangeArray;
///
/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
/// other columns to the same length with the "folded" [RangeArray] column.
///
/// To pass runtime information to the execution plan (or the range function), This plan
/// will add those extra columns:
/// - timestamp range with type [RangeArray], which is the folded timestamp column.
/// - end of current range with the same type as the timestamp column. (todo)
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct RangeManipulate {
start: Millisecond,
Expand Down Expand Up @@ -79,14 +84,18 @@ impl RangeManipulate {
})
}

pub fn range_timestamp_name(&self) -> String {
Self::build_timestamp_range_name(&self.time_index)
}

pub fn build_timestamp_range_name(time_index: &str) -> String {
format!("{time_index}_range")
}

pub fn internal_range_end_col_name() -> String {
"__internal_range_end".to_string()
}

fn range_timestamp_name(&self) -> String {
Self::build_timestamp_range_name(&self.time_index)
}

fn calculate_output_schema(
input_schema: &DFSchemaRef,
time_index: &str,
Expand All @@ -96,10 +105,10 @@ impl RangeManipulate {

// process time index column
// the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
let Some(index) = input_schema.index_of_column_by_name(None, time_index)? else {
let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else {
return Err(datafusion::common::field_not_found(None, time_index, input_schema.as_ref()))
};
let timestamp_range_field = columns[index]
let timestamp_range_field = columns[ts_col_index]
.field()
.clone()
.with_name(Self::build_timestamp_range_name(time_index));
Expand Down
4 changes: 2 additions & 2 deletions src/promql/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
mod aggr_over_time;
mod changes;
mod deriv;
mod extrapolate_rate;
mod idelta;
mod increase;
mod resets;
#[cfg(test)]
mod test_util;
Expand All @@ -28,8 +28,8 @@ pub use aggr_over_time::{
use datafusion::arrow::array::ArrayRef;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use idelta::IDelta;
pub use increase::Increase;

pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {
Expand Down
Loading