Skip to content

Commit

Permalink
fix: use linear interpolation to implement range LINEAR fill strategy (
Browse files Browse the repository at this point in the history
…#2903)

* fix: use linear interpolation to implement range LINEAR fill strategy

* chore: update test case

* chore: optimize linear interpolation implementation

* chore: update test and add comment
  • Loading branch information
Taylor-lagrange authored Dec 13, 2023
1 parent c13d2fd commit 370ec04
Showing 1 changed file with 166 additions and 60 deletions.
226 changes: 166 additions & 60 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ impl Fill {
}

/// The input `data` contains data on a complete time series.
/// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `data` is ascending time order.
pub fn apply_fill_strategy(&self, data: &mut [ScalarValue]) -> DfResult<()> {
/// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `ts`&`data` is ascending time order.
pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
let len = data.len();
if *self == Fill::Linear {
return Self::fill_linear(ts, data);
}
for i in 0..len {
if data[i].is_null() {
match self {
Expand All @@ -117,32 +120,101 @@ impl Fill {
data[i] = data[i - 1].clone()
}
}
Fill::Linear => {
if 0 < i && i < len - 1 {
match (&data[i - 1], &data[i + 1]) {
(ScalarValue::Float64(Some(a)), ScalarValue::Float64(Some(b))) => {
data[i] = ScalarValue::Float64(Some((a + b) / 2.0));
}
(ScalarValue::Float32(Some(a)), ScalarValue::Float32(Some(b))) => {
data[i] = ScalarValue::Float32(Some((a + b) / 2.0));
}
(a, b) => {
if !a.is_null() && !b.is_null() {
return Err(DataFusionError::Execution(
"RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string()));
} else {
continue;
}
}
}
}
}
// The calculation of linear interpolation is relatively complicated.
// `Self::fill_linear` is used to dispose `Fill::Linear`.
Fill::Linear => unreachable!(),
Fill::Const(v) => data[i] = v.clone(),
}
}
}
Ok(())
}

fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
let not_null_num = data
.iter()
.fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
// We need at least two non-empty data points to perform linear interpolation
if not_null_num < 2 {
return Ok(());
}
let mut index = 0;
let mut head: Option<usize> = None;
let mut tail: Option<usize> = None;
while index < data.len() {
// find null interval [start, end)
// start is null, end is not-null
let start = data[index..]
.iter()
.position(ScalarValue::is_null)
.unwrap_or(data.len() - index)
+ index;
if start == data.len() {
break;
}
let end = data[start..]
.iter()
.position(|r| !r.is_null())
.unwrap_or(data.len() - start)
+ start;
index = end + 1;
// head or tail null dispose later, record start/end first
if start == 0 {
head = Some(end);
} else if end == data.len() {
tail = Some(start);
} else {
linear_interpolation(ts, data, start - 1, end, start, end)?;
}
}
// dispose head null interval
if let Some(end) = head {
linear_interpolation(ts, data, end, end + 1, 0, end)?;
}
// dispose tail null interval
if let Some(start) = tail {
linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
}
Ok(())
}
}

/// use `(ts[i1], data[i1])`, `(ts[i2], data[i2])` as endpoint, linearly interpolates element over the interval `[start, end)`
fn linear_interpolation(
ts: &[i64],
data: &mut [ScalarValue],
i1: usize,
i2: usize,
start: usize,
end: usize,
) -> DfResult<()> {
let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
(ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
(ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
(*y0 as f64, *y1 as f64, true)
}
_ => {
return Err(DataFusionError::Execution(
"RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
));
}
};
// To avoid divide zero error, kind of defensive programming
if x1 == x0 {
return Err(DataFusionError::Execution(
"RangePlan: Linear interpolation using the same coordinate points".to_string(),
));
}
for i in start..end {
let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
data[i] = if is_float32 {
ScalarValue::Float32(Some(val as f32))
} else {
ScalarValue::Float64(Some(val))
}
}
Ok(())
}

#[derive(Eq, Clone, Debug)]
Expand Down Expand Up @@ -859,25 +931,16 @@ impl RangeSelectStream {
} in self.series_map.values()
{
// collect data on time series
if !need_sort_output {
for (ts, accumulators) in align_ts_accumulator {
for (i, accumulator) in accumulators.iter().enumerate() {
all_scalar[i].push(accumulator.evaluate()?);
}
ts_builder.append_value(*ts);
}
} else {
let mut keys = align_ts_accumulator.keys().copied().collect::<Vec<_>>();
keys.sort();
for key in &keys {
for (i, accumulator) in
align_ts_accumulator.get(key).unwrap().iter().enumerate()
{
all_scalar[i].push(accumulator.evaluate()?);
}
let mut align_ts = align_ts_accumulator.keys().copied().collect::<Vec<_>>();
if need_sort_output {
align_ts.sort();
}
for ts in &align_ts {
for (i, accumulator) in align_ts_accumulator.get(ts).unwrap().iter().enumerate() {
all_scalar[i].push(accumulator.evaluate()?);
}
ts_builder.append_slice(&keys);
}
ts_builder.append_slice(&align_ts);
// apply fill strategy on time series
for (
i,
Expand All @@ -891,7 +954,7 @@ impl RangeSelectStream {
if let Some(data_type) = need_cast {
cast_scalar_values(time_series_data, data_type)?;
}
fill.apply_fill_strategy(time_series_data)?;
fill.apply_fill_strategy(&align_ts, time_series_data)?;
}
by_rows.resize(by_rows.len() + align_ts_accumulator.len(), row.row());
start_index += align_ts_accumulator.len();
Expand Down Expand Up @@ -1220,13 +1283,13 @@ mod test {
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 2.0 | | 1970-01-01T00:00:25 | host1 |\
\n| 2.0 | 2.5 | 1970-01-01T00:00:25 | host1 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n| 5.0 | | 1970-01-01T00:00:25 | host2 |\
\n| 5.0 | 5.5 | 1970-01-01T00:00:25 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, true, expected).await;
Expand Down Expand Up @@ -1266,13 +1329,13 @@ mod test {
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 2.0 | | 1970-01-01T00:00:25 | host1 |\
\n| 2.0 | 2.5 | 1970-01-01T00:00:25 | host1 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n| 5.0 | | 1970-01-01T00:00:25 | host2 |\
\n| 5.0 | 5.5 | 1970-01-01T00:00:25 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, false, expected).await;
Expand Down Expand Up @@ -1339,29 +1402,72 @@ mod test {
ScalarValue::UInt8(None),
ScalarValue::UInt8(Some(9)),
];
Fill::Null.apply_fill_strategy(&mut test1).unwrap();
Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(None));
Fill::Prev.apply_fill_strategy(&mut test1).unwrap();
Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
test1[1] = ScalarValue::UInt8(None);
Fill::Const(ScalarValue::UInt8(Some(10)))
.apply_fill_strategy(&mut test1)
.apply_fill_strategy(&[], &mut test1)
.unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
test1[1] = ScalarValue::UInt8(None);
assert_eq!(
Fill::Linear
.apply_fill_strategy(&mut test1)
.unwrap_err()
.to_string(),
"Execution error: RangePlan: Apply Fill LINEAR strategy on Non-floating type"
);
let mut test2 = vec![
ScalarValue::Float32(Some(8.0)),
}

#[test]
fn test_fill_linear() {
let ts = vec![1, 2, 3, 4, 5];
let mut test = vec![
ScalarValue::Float32(Some(1.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(3.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(9.0)),
ScalarValue::Float32(Some(5.0)),
];
Fill::Linear.apply_fill_strategy(&mut test2).unwrap();
assert_eq!(test2[1], ScalarValue::Float32(Some(8.5)));
Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
let mut test1 = vec![
ScalarValue::Float32(None),
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(4.0)),
ScalarValue::Float32(None),
];
Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
assert_eq!(test, test1);
// test linear interpolation on irregularly spaced ts/data
let ts = vec![
1, // None
3, // 1.0
8, // 11.0
30, // None
88, // 10.0
108, // 5.0
128, // None
];
let mut test = vec![
ScalarValue::Float64(None),
ScalarValue::Float64(Some(1.0)),
ScalarValue::Float64(Some(11.0)),
ScalarValue::Float64(None),
ScalarValue::Float64(Some(10.0)),
ScalarValue::Float64(Some(5.0)),
ScalarValue::Float64(None),
];
Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
let data: Vec<_> = test
.into_iter()
.map(|x| {
let ScalarValue::Float64(Some(f)) = x else {
unreachable!()
};
f
})
.collect();
assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
// test corner case
let ts = vec![1];
let test = vec![ScalarValue::Float32(None)];
let mut test1 = test.clone();
Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
assert_eq!(test, test1);
}
}

0 comments on commit 370ec04

Please sign in to comment.