Skip to content

Commit

Permalink
chore: change range expr name(e.g. MAX(v) RANGE 5s FILL 6)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed Oct 11, 2023
1 parent ed08a29 commit 28cd34e
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 270 deletions.
38 changes: 16 additions & 22 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,28 +254,22 @@ impl RangeSelect {
DFSchema::new_with_metadata(by_fields, input.schema().metadata().clone())
.context(DataFusionSnafu)?,
);
// If the result of the project plan happens to be the schema of the range plan, no project plan is required
// that need project is identical to range plan schema.
// 1. all exprs in project must belong to range schema
// 2. range schema and project exprs must have same size
let schema_project = if projection_expr.len() == schema_before_project.fields().len() {
projection_expr
.iter()
.map(|project_expr| {
if let Expr::Column(column) = project_expr {
schema_before_project
.index_of_column_by_name(column.relation.as_ref(), &column.name)
.unwrap_or(None)
.ok_or(())
} else {
Err(())
}
})
.collect::<std::result::Result<Vec<usize>, ()>>()
.ok()
} else {
None
};
// If the results of project plan can be obtained directly from range plan without any additional calculations, no project plan is required.
// We can simply project the final output of the range plan to produce the final result.
let schema_project = projection_expr
.iter()
.map(|project_expr| {
if let Expr::Column(column) = project_expr {
schema_before_project
.index_of_column_by_name(column.relation.as_ref(), &column.name)
.unwrap_or(None)
.ok_or(())
} else {
Err(())
}
})
.collect::<std::result::Result<Vec<usize>, ()>>()
.ok();
let schema = if let Some(project) = &schema_project {
let project_field = project
.iter()
Expand Down
47 changes: 26 additions & 21 deletions src/query/src/range_select/plan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> {
self.align = align;
}
let range_fn = RangeFn {
name: format!("{} {} {}", range_expr.display_name()?, range_str, fill),
name: format!(
"{} RANGE {} FILL {}",
range_expr.display_name()?,
range_str,
fill
),
data_type,
expr: range_expr,
range: parse_duration(range_str).map_err(DataFusionError::Plan)?,
Expand Down Expand Up @@ -418,7 +423,7 @@ mod test {
async fn range_no_project() {
let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) 5m NULL], align=3600s time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) 5m NULL:Float64;N]\
"RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -428,8 +433,8 @@ mod test {
async fn range_expr_calculation() {
let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: AVG(test.field_0 + test.field_1) 5m NULL / Int64(4) [AVG(test.field_0 + test.field_1) 5m NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) 5m NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -440,8 +445,8 @@ mod test {
let query =
r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
let expected = String::from(
"Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) 5m NULL / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) 5m NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) 5m NULL], align=3600s time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
"Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -451,8 +456,8 @@ mod test {
async fn range_calculation() {
let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
let expected = String::from(
"Projection: (AVG(test.field_0) 5m NULL + SUM(test.field_1) 5m NULL) / Int64(4) [AVG(test.field_0) 5m NULL + SUM(test.field_1) 5m NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) 5m NULL, SUM(test.field_1) 5m NULL], align=3600s time_index=timestamp [AVG(test.field_0) 5m NULL:Float64;N, SUM(test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) [AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -464,8 +469,8 @@ mod test {
let expected = String::from(
"Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
\n Filter: foo > Int64(1) [foo:Float64;N]\
\n Projection: (AVG(test.field_0) 5m NULL + SUM(test.field_1) 5m NULL) / Int64(4) AS foo [foo:Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) 5m NULL, SUM(test.field_1) 5m NULL], align=3600s time_index=timestamp [AVG(test.field_0) 5m NULL:Float64;N, SUM(test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -475,8 +480,8 @@ mod test {
async fn range_from_nest_query() {
let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
let expected = String::from(
"Projection: (AVG(a) 5m NULL + SUM(b) 5m NULL) / Int64(4) [AVG(a) 5m NULL + SUM(b) 5m NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(a) 5m NULL, SUM(b) 5m NULL], align=3600s time_index=timestamp [AVG(a) 5m NULL:Float64;N, SUM(b) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
"Projection: (AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL) / Int64(4) [AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(a) RANGE 5m FILL NULL, SUM(b) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(a) RANGE 5m FILL NULL:Float64;N, SUM(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
\n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
Expand All @@ -488,8 +493,8 @@ mod test {
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: sin(AVG(test.field_0 + test.field_1) 5m NULL + Int64(1)) [sin(AVG(test.field_0 + test.field_1) 5m NULL + Int64(1)):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) 5m NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -499,8 +504,8 @@ mod test {
async fn duplicate_range_expr() {
let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: AVG(test.field_0) 5m 6 + AVG(test.field_0) 5m 6 [AVG(test.field_0) 5m 6 + AVG(test.field_0) 5m 6:Float64]\
\n RangeSelect: range_exprs=[AVG(test.field_0) 5m 6], align=3600s time_index=timestamp [AVG(test.field_0) 5m 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6 [AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6:Float64]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL 6], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -510,8 +515,8 @@ mod test {
async fn deep_nest_range_expr() {
let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: round(sin(AVG(test.field_0 + test.field_1) 5m NULL + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) 5m NULL + Int64(1))):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) 5m NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) 5m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -521,8 +526,8 @@ mod test {
async fn complex_range_expr() {
let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
let expected = String::from(
"Projection: gcd(CAST(MAX(test.field_0 + Int64(1)) 5m NULL AS Int64), CAST(test.tag_0 AS Int64)) + round(MAX(test.field_2 + Int64(1)) 6m NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) 10m NULL * CAST(test.tag_1 AS Float64) + Int64(1) [gcd(MAX(test.field_0 + Int64(1)) 5m NULL,test.tag_0) + round(MAX(test.field_2 + Int64(1)) 6m NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) 10m NULL * test.tag_1 + Int64(1):Float64;N]\
\n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) 5m NULL, MAX(test.field_2 + Int64(1)) 6m NULL, MAX(test.field_2 + Int64(3)) 10m NULL], align=3600s time_index=timestamp [MAX(test.field_0 + Int64(1)) 5m NULL:Float64;N, MAX(test.field_2 + Int64(1)) 6m NULL:Float64;N, MAX(test.field_2 + Int64(3)) 10m NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: gcd(CAST(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL AS Int64), CAST(test.tag_0 AS Int64)) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * CAST(test.tag_1 AS Float64) + Int64(1) [gcd(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL,test.tag_0) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * test.tag_1 + Int64(1):Float64;N]\
\n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600s time_index=timestamp [MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -532,7 +537,7 @@ mod test {
async fn range_linear_on_integer() {
let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"RangeSelect: range_exprs=[MIN(test.field_0 + test.field_1) 5m LINEAR], align=3600s time_index=timestamp [MIN(test.field_0 + test.field_1) 5m LINEAR:Float64;N]\
"RangeSelect: range_exprs=[MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR], align=3600s time_index=timestamp [MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand Down
Loading

0 comments on commit 28cd34e

Please sign in to comment.