From 1376e0e054271d2ca9f775fa668015cc9e430cab Mon Sep 17 00:00:00 2001
From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com>
Date: Wed, 27 Sep 2023 18:51:35 +0800
Subject: [PATCH] fix(expr): return error when get_window_start_with_offset
 overflows (#12566)

---
 src/expr/impl/src/scalar/tumble.rs | 44 ++++++++++++++++++++++--------
 1 file changed, 33 insertions(+), 11 deletions(-)

diff --git a/src/expr/impl/src/scalar/tumble.rs b/src/expr/impl/src/scalar/tumble.rs
index bf6dddfd7bbf9..507bbbd531ac4 100644
--- a/src/expr/impl/src/scalar/tumble.rs
+++ b/src/expr/impl/src/scalar/tumble.rs
@@ -14,13 +14,21 @@
 
 use num_traits::Zero;
 use risingwave_common::types::{Date, Interval, Timestamp, Timestamptz};
-use risingwave_expr::{function, Result};
+use risingwave_expr::{function, ExprError, Result};
 
 #[inline(always)]
-fn interval_to_micro_second(t: Interval) -> i64 {
-    t.months() as i64 * Interval::USECS_PER_MONTH
-        + t.days() as i64 * Interval::USECS_PER_DAY
-        + t.usecs()
+fn interval_to_micro_second(t: Interval) -> Result<i64> {
+    let checked_interval_to_micro_second = || {
+        (t.months() as i64)
+            .checked_mul(Interval::USECS_PER_MONTH)?
+            .checked_add(
+                (t.days() as i64)
+                    .checked_mul(Interval::USECS_PER_DAY)?
+                    .checked_add(t.usecs())?,
+            )
+    };
+
+    checked_interval_to_micro_second().ok_or(ExprError::NumericOutOfRange)
 }
 
 #[function("tumble_start(date, interval) -> timestamp")]
@@ -80,15 +88,23 @@ fn get_window_start_with_offset(
     window_size: Interval,
     offset: Interval,
 ) -> Result<i64> {
-    let window_size_micro_second = interval_to_micro_second(window_size);
-    let offset_micro_second = interval_to_micro_second(offset);
+    let window_size_micro_second = interval_to_micro_second(window_size)?;
+    let offset_micro_second = interval_to_micro_second(offset)?;
 
     // Inspired by https://issues.apache.org/jira/browse/FLINK-26334
-    let remainder = (timestamp_micro_second - offset_micro_second) % window_size_micro_second;
+    let remainder = timestamp_micro_second
+        .checked_sub(offset_micro_second)
+        .ok_or(ExprError::NumericOutOfRange)?
+        .checked_rem(window_size_micro_second)
+        .ok_or(ExprError::DivisionByZero)?;
     if remainder < 0 {
-        Ok(timestamp_micro_second - (remainder + window_size_micro_second))
+        timestamp_micro_second
+            .checked_sub(remainder + window_size_micro_second)
+            .ok_or(ExprError::NumericOutOfRange)
     } else {
-        Ok(timestamp_micro_second - remainder)
+        timestamp_micro_second
+            .checked_sub(remainder)
+            .ok_or(ExprError::NumericOutOfRange)
     }
 }
 
@@ -167,7 +183,7 @@ mod tests {
             let window_size = Interval::from_minutes(5);
             let window_start = get_window_start(timestamp_micro_second, window_size).unwrap();
 
-            let window_size_micro_second = interval_to_micro_second(window_size);
+            let window_size_micro_second = interval_to_micro_second(window_size).unwrap();
             let default_window_start = timestamp_micro_second
                 - (timestamp_micro_second + window_size_micro_second) % window_size_micro_second;
 
@@ -180,4 +196,10 @@ mod tests {
         }
         assert_ne!(wrong_cnt, 0);
     }
+
+    #[test]
+    fn test_window_start_overflow() {
+        get_window_start(i64::MIN, Interval::from_millis(20)).unwrap_err();
+        interval_to_micro_second(Interval::from_month_day_usec(1, 1, i64::MAX)).unwrap_err();
+    }
 }