Skip to content

Commit

Permalink
feat: add align to / interval support in range query (#2842)
Browse files Browse the repository at this point in the history
* feat: add align to / interval support in range query

* chore: fix ci

* chore: simplify `parse_duration_expr`

* chore: change s to ms
  • Loading branch information
Taylor-lagrange authored Dec 4, 2023
1 parent f78dab0 commit 806400c
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 67 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ serde_json = "1.0"
smallvec = "1"
snafu = "0.7"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(int_roundings)]

pub mod dataframe;
pub mod datafusion;
Expand Down
69 changes: 57 additions & 12 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
use futures::{ready, Stream};
use futures_util::StreamExt;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};

use crate::error::{DataFusionSnafu, Result};
use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result};

type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;

Expand Down Expand Up @@ -147,7 +147,7 @@ impl Fill {

#[derive(Eq, Clone, Debug)]
pub struct RangeFn {
/// with format like `max(a) 300s null`
/// with format like `max(a) RANGE 300s FILL NULL`
pub name: String,
pub data_type: DataType,
pub expr: Expr,
Expand Down Expand Up @@ -197,6 +197,7 @@ pub struct RangeSelect {
/// all range expressions
pub range_expr: Vec<RangeFn>,
pub align: Duration,
pub align_to: i64,
pub time_index: String,
pub by: Vec<Expr>,
pub schema: DFSchemaRef,
Expand All @@ -216,10 +217,28 @@ impl RangeSelect {
input: Arc<LogicalPlan>,
range_expr: Vec<RangeFn>,
align: Duration,
align_to: i64,
time_index: Expr,
by: Vec<Expr>,
projection_expr: &[Expr],
) -> Result<Self> {
ensure!(
align.as_millis() != 0,
RangeQuerySnafu {
msg: "Can't use 0 as align in Range Query"
}
);
for expr in &range_expr {
ensure!(
expr.range.as_millis() != 0,
RangeQuerySnafu {
msg: format!(
"Invalid Range expr `{}`, Can't use 0 as range in Range Query",
expr.name
)
}
);
}
let mut fields = range_expr
.iter()
.map(
Expand Down Expand Up @@ -289,6 +308,7 @@ impl RangeSelect {
input,
range_expr,
align,
align_to,
time_index: time_index_name,
schema,
by_schema,
Expand Down Expand Up @@ -322,13 +342,19 @@ impl UserDefinedLogicalNodeCore for RangeSelect {
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RangeSelect: range_exprs=[{}], align={}s time_index={}",
"RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
self.range_expr
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.align.as_secs(),
self.align.as_millis(),
self.align_to,
self.by
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.time_index
)
}
Expand All @@ -338,6 +364,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect {

Self {
align: self.align,
align_to: self.align_to,
range_expr: self.range_expr.clone(),
input: Arc::new(inputs[0].clone()),
time_index: self.time_index.clone(),
Expand Down Expand Up @@ -463,6 +490,7 @@ impl RangeSelect {
input: exec_input,
range_exec,
align: self.align.as_millis() as Millisecond,
align_to: self.align_to,
by: self.create_physical_expr_list(
&self.by,
input_dfschema,
Expand Down Expand Up @@ -493,6 +521,7 @@ pub struct RangeSelectExec {
input: Arc<dyn ExecutionPlan>,
range_exec: Vec<RangeFnExec>,
align: Millisecond,
align_to: i64,
time_index: String,
by: Vec<Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
Expand All @@ -510,16 +539,24 @@ impl DisplayAs for RangeSelectExec {
let range_expr_strs: Vec<String> = self
.range_exec
.iter()
.map(|e| format!("RangeFnExec{{ {}, range: {:?}}}", e.expr.name(), e.range))
.map(|e| {
format!(
"{} RANGE {}s FILL {}",
e.expr.name(),
e.range / 1000,
e.fill
)
})
.collect();
let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
write!(
f,
"range_expr=[{}], align={}, time_index={}, by=[{}]",
"range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
range_expr_strs.join(", "),
self.align,
self.align_to,
by.join(", "),
self.time_index,
by.join(", ")
)?;
}
}
Expand Down Expand Up @@ -563,6 +600,7 @@ impl ExecutionPlan for RangeSelectExec {
time_index: self.time_index.clone(),
by: self.by.clone(),
align: self.align,
align_to: self.align_to,
schema: self.schema.clone(),
by_schema: self.by_schema.clone(),
metric: self.metric.clone(),
Expand Down Expand Up @@ -599,6 +637,7 @@ impl ExecutionPlan for RangeSelectExec {
random_state: RandomState::new(),
time_index,
align: self.align,
align_to: self.align_to,
by: self.by.clone(),
series_map: HashMap::new(),
exec_state: ExecutionState::ReadingInput,
Expand Down Expand Up @@ -629,6 +668,7 @@ struct RangeSelectStream {
time_index: usize,
/// the unit of `align` is millisecond
align: Millisecond,
align_to: i64,
by: Vec<Arc<dyn PhysicalExpr>>,
exec_state: ExecutionState,
/// Converter for the by values
Expand Down Expand Up @@ -657,11 +697,13 @@ struct SeriesState {
align_ts_accumulator: HashMap<Millisecond, Vec<Box<dyn Accumulator>>>,
}

/// According to `align`, produces a calendar-based aligned time.
/// Use `align_to` as time origin.
/// According to `align` as time interval, produces aligned time.
/// Combining the parameters related to the range query,
/// determine for each `Accumulator` `(hash, align_ts)` define,
/// which rows of data will be applied to it.
fn align_to_calendar(
fn produce_align_time(
align_to: i64,
range: Millisecond,
align: Millisecond,
ts_column: &TimestampMillisecondArray,
Expand All @@ -672,7 +714,8 @@ fn align_to_calendar(
// make modify_map for range_fn[i]
for (row, hash) in by_columns_hash.iter().enumerate() {
let ts = ts_column.value(row);
let mut align_ts = ((ts + align - 1) / align) * align;
let ith_slot = (ts - align_to).div_ceil(align);
let mut align_ts = ith_slot * align + align_to;
while align_ts - range < ts && ts <= align_ts {
modify_map
.entry((*hash, align_ts))
Expand Down Expand Up @@ -733,7 +776,8 @@ impl RangeSelectStream {
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
// use self.modify_map record (hash, align_ts) => [row_nums]
align_to_calendar(
produce_align_time(
self.align_to,
self.range_exec[i].range,
self.align,
ts_column_ref,
Expand Down Expand Up @@ -1065,6 +1109,7 @@ mod test {
},
],
align,
align_to: 0,
by: vec![Arc::new(Column::new("host", 2))],
time_index: TIME_INDEX_COLUMN.to_string(),
schema: schema.clone(),
Expand Down
Loading

0 comments on commit 806400c

Please sign in to comment.