Skip to content

Commit

Permalink
fix(expr): return error when get_window_start_with_offset overflows (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Sep 27, 2023
1 parent 491273d commit 1376e0e
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions src/expr/impl/src/scalar/tumble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@

use num_traits::Zero;
use risingwave_common::types::{Date, Interval, Timestamp, Timestamptz};
use risingwave_expr::{function, Result};
use risingwave_expr::{function, ExprError, Result};

#[inline(always)]
fn interval_to_micro_second(t: Interval) -> i64 {
t.months() as i64 * Interval::USECS_PER_MONTH
+ t.days() as i64 * Interval::USECS_PER_DAY
+ t.usecs()
fn interval_to_micro_second(t: Interval) -> Result<i64> {
let checked_interval_to_micro_second = || {
(t.months() as i64)
.checked_mul(Interval::USECS_PER_MONTH)?
.checked_add(
(t.days() as i64)
.checked_mul(Interval::USECS_PER_DAY)?
.checked_add(t.usecs())?,
)
};

checked_interval_to_micro_second().ok_or(ExprError::NumericOutOfRange)
}

#[function("tumble_start(date, interval) -> timestamp")]
Expand Down Expand Up @@ -80,15 +88,23 @@ fn get_window_start_with_offset(
window_size: Interval,
offset: Interval,
) -> Result<i64> {
let window_size_micro_second = interval_to_micro_second(window_size);
let offset_micro_second = interval_to_micro_second(offset);
let window_size_micro_second = interval_to_micro_second(window_size)?;
let offset_micro_second = interval_to_micro_second(offset)?;

// Inspired by https://issues.apache.org/jira/browse/FLINK-26334
let remainder = (timestamp_micro_second - offset_micro_second) % window_size_micro_second;
let remainder = timestamp_micro_second
.checked_sub(offset_micro_second)
.ok_or(ExprError::NumericOutOfRange)?
.checked_rem(window_size_micro_second)
.ok_or(ExprError::DivisionByZero)?;
if remainder < 0 {
Ok(timestamp_micro_second - (remainder + window_size_micro_second))
timestamp_micro_second
.checked_sub(remainder + window_size_micro_second)
.ok_or(ExprError::NumericOutOfRange)
} else {
Ok(timestamp_micro_second - remainder)
timestamp_micro_second
.checked_sub(remainder)
.ok_or(ExprError::NumericOutOfRange)
}
}

Expand Down Expand Up @@ -167,7 +183,7 @@ mod tests {
let window_size = Interval::from_minutes(5);
let window_start = get_window_start(timestamp_micro_second, window_size).unwrap();

let window_size_micro_second = interval_to_micro_second(window_size);
let window_size_micro_second = interval_to_micro_second(window_size).unwrap();
let default_window_start = timestamp_micro_second
- (timestamp_micro_second + window_size_micro_second) % window_size_micro_second;

Expand All @@ -180,4 +196,10 @@ mod tests {
}
assert_ne!(wrong_cnt, 0);
}

#[test]
fn test_window_start_overflow() {
get_window_start(i64::MIN, Interval::from_millis(20)).unwrap_err();
interval_to_micro_second(Interval::from_month_day_usec(1, 1, i64::MAX)).unwrap_err();
}
}

0 comments on commit 1376e0e

Please sign in to comment.