From d6035dd48dcd03605e6eea963f30959fb40f4a9c Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 17 May 2024 16:31:40 +0800 Subject: [PATCH] fix: tests&tumble signature&accept both ts&datetime --- src/flow/src/compute/render.rs | 2 +- src/flow/src/compute/render/reduce.rs | 16 +++++------ src/flow/src/expr/func.rs | 38 ++++++++++++++------------- src/flow/src/expr/scalar.rs | 2 +- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 8279974b4781..44b025359852 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -223,11 +223,11 @@ mod test { use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::graph_ext::GraphExt; use hydroflow::scheduled::handoff::VecHandoff; + use pretty_assertions::{assert_eq, assert_ne}; use super::*; use crate::expr::BinaryFunc; use crate::repr::Row; - pub fn run_and_check( state: &mut DataflowState, df: &mut Hydroflow, diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 24c117442ac0..7aa10e657901 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -739,7 +739,7 @@ mod test { use std::cell::RefCell; use std::rc::Rc; - use common_time::{DateTime, Interval}; + use common_time::{DateTime, Interval, Timestamp}; use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT}; use hydroflow::scheduled::graph::Hydroflow; @@ -770,7 +770,7 @@ mod test { .into_iter() .map(|(number, ts)| { ( - Row::new(vec![number.into(), DateTime::new(ts).into()]), + Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]), 1, 1, ) @@ -879,8 +879,8 @@ mod test { ( Row::new(vec![ 3u64.into(), - DateTime::new(START + 1000).into(), - DateTime::new(START + 2000).into(), + Timestamp::new_millisecond(START + 1000).into(), + Timestamp::new_millisecond(START + 2000).into(), ]), 1, 1, @@ -888,8 +888,8 @@ mod test { ( Row::new(vec![ 4u64.into(), - DateTime::new(START + 2000).into(), - DateTime::new(START + 3000).into(), + Timestamp::new_millisecond(START + 2000).into(), + Timestamp::new_millisecond(START + 3000).into(), ]), 1, 1, @@ -897,8 +897,8 @@ mod test { ( Row::new(vec![ 5u64.into(), - DateTime::new(START + 3000).into(), - DateTime::new(START + 4000).into(), + Timestamp::new_millisecond(START + 3000).into(), + Timestamp::new_millisecond(START + 4000).into(), ]), 1, 1, diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 6f7752e58f92..b8c8ac7f3649 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -69,8 +69,8 @@ impl UnmaterializableFunc { generic_fn: GenericFn::CurrentSchema, }, Self::TumbleWindow { .. } => Signature { - input: smallvec![ConcreteDataType::datetime_datatype()], - output: ConcreteDataType::datetime_datatype(), + input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()], + output: ConcreteDataType::timestamp_millisecond_datatype(), generic_fn: GenericFn::TumbleWindow, }, } @@ -277,14 +277,7 @@ impl UnaryFunc { window_size, start_time, } => { - let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu { - expected: ConcreteDataType::timestamp_millisecond_datatype(), - actual: arg.data_type(), - })?; - let ts = ts - .convert_to(TimeUnit::Millisecond) - .context(OverflowSnafu)? - .value(); + let ts = get_ts_as_millisecond(arg)?; let start_time = start_time.map(|t| t.val()).unwrap_or(0); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond let window_start = start_time + (ts - start_time) / window_size * window_size; @@ -296,14 +289,7 @@ impl UnaryFunc { window_size, start_time, } => { - let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu { - expected: ConcreteDataType::timestamp_millisecond_datatype(), - actual: arg.data_type(), - })?; - let ts = ts - .convert_to(TimeUnit::Millisecond) - .context(OverflowSnafu)? - .value(); + let ts = get_ts_as_millisecond(arg)?; let start_time = start_time.map(|t| t.val()).unwrap_or(0); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond let window_start = start_time + (ts - start_time) / window_size * window_size; @@ -316,6 +302,22 @@ impl UnaryFunc { } } +fn get_ts_as_millisecond(arg: Value) -> Result { + let ts = if let Some(ts) = arg.as_timestamp() { + ts.convert_to(TimeUnit::Millisecond) + .context(OverflowSnafu)? + .value() + } else if let Some(ts) = arg.as_datetime() { + ts.val() + } else { + InvalidArgumentSnafu { + reason: "Expect input to be timestamp or datetime type", + } + .fail()? + }; + Ok(ts) +} + /// BinaryFunc is a function that takes two arguments. /// Also notice this enum doesn't contain function arguments, since the arguments are stored in the expression. /// diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index dfd5fcd0f214..53c570e7a6c2 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -106,7 +106,7 @@ impl TypedExpr { }) .collect::, _>>()?; - Ok(exprs) + Ok(dbg!(exprs)) } }