Skip to content

Commit

Permalink
feat: support time() and related functions in PromQL (#2854)
Browse files Browse the repository at this point in the history
* enhance empty_metric

Signed-off-by: Ruihang Xia <[email protected]>

* implementation

Signed-off-by: Ruihang Xia <[email protected]>

* fix lhs & rhs

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* fix typo, update sqlness

Signed-off-by: Ruihang Xia <[email protected]>

* remove deadcode

Signed-off-by: Ruihang Xia <[email protected]>

* add cast to bool modifier

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 4, 2023
1 parent 58c1373 commit de41646
Show file tree
Hide file tree
Showing 5 changed files with 729 additions and 70 deletions.
94 changes: 68 additions & 26 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema, TimeUnit};
use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -48,7 +49,7 @@ pub struct EmptyMetric {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
expr: Expr,
expr: Option<Expr>,
/// Schema that only contains the time index column.
/// This is for intermediate result only.
time_index_schema: DFSchemaRef,
Expand All @@ -63,17 +64,20 @@ impl EmptyMetric {
interval: Millisecond,
time_index_column_name: String,
field_column_name: String,
field_expr: Expr,
field_expr: Option<Expr>,
) -> DataFusionResult<Self> {
let ts_only_schema = build_ts_only_schema(&time_index_column_name);
let field_data_type = field_expr.get_type(&ts_only_schema)?;
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
ts_only_schema.field(0).clone(),
DFField::new(Some(""), &field_column_name, field_data_type, true),
],
HashMap::new(),
)?);
let mut fields = vec![ts_only_schema.field(0).clone()];
if let Some(field_expr) = &field_expr {
let field_data_type = field_expr.get_type(&ts_only_schema)?;
fields.push(DFField::new(
Some(""),
&field_column_name,
field_data_type,
true,
));
}
let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);

Ok(Self {
start,
Expand All @@ -94,12 +98,18 @@ impl EmptyMetric {
session_state: &SessionState,
physical_planner: &dyn PhysicalPlanner,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let physical_expr = physical_planner.create_physical_expr(
&self.expr,
&self.result_schema,
&ArrowSchema::from(self.result_schema.as_ref()),
session_state,
)?;
let physical_expr = self
.expr
.as_ref()
.map(|expr| {
physical_planner.create_physical_expr(
expr,
&self.result_schema,
&ArrowSchema::from(self.result_schema.as_ref()),
session_state,
)
})
.transpose()?;

Ok(Arc::new(EmptyMetricExec {
start: self.start,
Expand Down Expand Up @@ -153,7 +163,7 @@ pub struct EmptyMetricExec {
time_index_schema: SchemaRef,
/// Schema of the output record batch
result_schema: SchemaRef,
expr: PhysicalExprRef,
expr: Option<PhysicalExprRef>,

metric: ExecutionPlanMetricsSet,
}
Expand Down Expand Up @@ -241,7 +251,7 @@ pub struct EmptyMetricStream {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
expr: PhysicalExprRef,
expr: Option<PhysicalExprRef>,
/// This stream only generate one record batch at the first poll
is_first_poll: bool,
/// Schema that only contains the time index column.
Expand Down Expand Up @@ -272,20 +282,24 @@ impl Stream for EmptyMetricStream {
.step_by(self.interval as _)
.collect::<Vec<_>>();
let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
let num_rows = time_array.len();
let input_record_batch =
RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
.map_err(DataFusionError::ArrowError)?;
let mut result_arrays: Vec<ArrayRef> = vec![time_array];

// evaluate the field expr and get the result
let field_array = self
.expr
.evaluate(&input_record_batch)?
.into_array(time_array.len());
if let Some(field_expr) = &self.expr {
result_arrays.push(
field_expr
.evaluate(&input_record_batch)?
.into_array(num_rows),
);
}

// assemble the output record batch
let batch =
RecordBatch::try_new(self.result_schema.clone(), vec![time_array, field_array])
.map_err(DataFusionError::ArrowError);
let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
.map_err(DataFusionError::ArrowError);

Poll::Ready(Some(batch))
} else {
Expand Down Expand Up @@ -344,7 +358,7 @@ mod test {
interval,
time_column_name,
field_column_name,
time_expr,
Some(time_expr),
)
.unwrap();
let empty_metric_exec = empty_metric
Expand Down Expand Up @@ -455,4 +469,32 @@ mod test {
)
.await
}

#[tokio::test]
async fn no_field_expr() {
let session_context = SessionContext::default();
let df_default_physical_planner = DefaultPhysicalPlanner::default();
let empty_metric =
EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
let empty_metric_exec = empty_metric
.to_execution_plan(&session_context.state(), &df_default_physical_planner)
.unwrap();

let result =
datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();

let expected = String::from(
"+---------------------+\
\n| time |\
\n+---------------------+\
\n| 1970-01-01T00:00:00 |\
\n+---------------------+",
);
assert_eq!(result_literal, expected);
}
}
Loading

0 comments on commit de41646

Please sign in to comment.