Skip to content

Commit

Permalink
fix: tests&tumble signature&accept both ts&datetime
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 17, 2024
1 parent f2d8fbf commit d6035dd
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ mod test {
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::{assert_eq, assert_ne};

use super::*;
use crate::expr::BinaryFunc;
use crate::repr::Row;

pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
Expand Down
16 changes: 8 additions & 8 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;

use common_time::{DateTime, Interval};
use common_time::{DateTime, Interval, Timestamp};
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use hydroflow::scheduled::graph::Hydroflow;

Expand Down Expand Up @@ -770,7 +770,7 @@ mod test {
.into_iter()
.map(|(number, ts)| {
(
Row::new(vec![number.into(), DateTime::new(ts).into()]),
Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1,
1,
)
Expand Down Expand Up @@ -879,26 +879,26 @@ mod test {
(
Row::new(vec![
3u64.into(),
DateTime::new(START + 1000).into(),
DateTime::new(START + 2000).into(),
Timestamp::new_millisecond(START + 1000).into(),
Timestamp::new_millisecond(START + 2000).into(),
]),
1,
1,
),
(
Row::new(vec![
4u64.into(),
DateTime::new(START + 2000).into(),
DateTime::new(START + 3000).into(),
Timestamp::new_millisecond(START + 2000).into(),
Timestamp::new_millisecond(START + 3000).into(),
]),
1,
1,
),
(
Row::new(vec![
5u64.into(),
DateTime::new(START + 3000).into(),
DateTime::new(START + 4000).into(),
Timestamp::new_millisecond(START + 3000).into(),
Timestamp::new_millisecond(START + 4000).into(),
]),
1,
1,
Expand Down
38 changes: 20 additions & 18 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl UnmaterializableFunc {
generic_fn: GenericFn::CurrentSchema,
},
Self::TumbleWindow { .. } => Signature {
input: smallvec![ConcreteDataType::datetime_datatype()],
output: ConcreteDataType::datetime_datatype(),
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
}
Expand Down Expand Up @@ -277,14 +277,7 @@ impl UnaryFunc {
window_size,
start_time,
} => {
let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: arg.data_type(),
})?;
let ts = ts
.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value();
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
Expand All @@ -296,14 +289,7 @@ impl UnaryFunc {
window_size,
start_time,
} => {
let ts = arg.as_timestamp().with_context(|| TypeMismatchSnafu {
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: arg.data_type(),
})?;
let ts = ts
.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value();
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
Expand All @@ -316,6 +302,22 @@ impl UnaryFunc {
}
}

fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
let ts = if let Some(ts) = arg.as_timestamp() {
ts.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value()
} else if let Some(ts) = arg.as_datetime() {
ts.val()
} else {
InvalidArgumentSnafu {
reason: "Expect input to be timestamp or datetime type",
}
.fail()?
};
Ok(ts)
}

/// BinaryFunc is a function that takes two arguments.
/// Also notice this enum doesn't contain function arguments, since the arguments are stored in the expression.
///
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/expr/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl TypedExpr {
})
.collect::<Result<Vec<_>, _>>()?;

Ok(exprs)
Ok(dbg!(exprs))
}
}

Expand Down

0 comments on commit d6035dd

Please sign in to comment.