From 9baff0880b00fd26f316693a4a2fa09925d45ea6 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 18:44:54 +0800 Subject: [PATCH] fix: min, max with append only --- dozer-sql/expression/src/aggregate.rs | 6 + dozer-sql/expression/src/execution.rs | 2 + dozer-sql/src/aggregation/max_append_only.rs | 176 +++++ dozer-sql/src/aggregation/min_append_only.rs | 176 +++++ dozer-sql/src/aggregation/mod.rs | 4 + .../aggregation_max_append_only_tests.rs | 607 ++++++++++++++++++ .../aggregation_min_append_only_tests.rs | 607 ++++++++++++++++++ dozer-sql/src/aggregation/tests/mod.rs | 5 + 8 files changed, 1583 insertions(+) create mode 100644 dozer-sql/src/aggregation/max_append_only.rs create mode 100644 dozer-sql/src/aggregation/min_append_only.rs create mode 100644 dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs create mode 100644 dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs diff --git a/dozer-sql/expression/src/aggregate.rs b/dozer-sql/expression/src/aggregate.rs index d136358979..fd1dcf002c 100644 --- a/dozer-sql/expression/src/aggregate.rs +++ b/dozer-sql/expression/src/aggregate.rs @@ -5,8 +5,10 @@ pub enum AggregateFunctionType { Avg, Count, Max, + MaxAppendOnly, MaxValue, Min, + MinAppendOnly, MinValue, Sum, } @@ -17,8 +19,10 @@ impl AggregateFunctionType { "avg" => Some(AggregateFunctionType::Avg), "count" => Some(AggregateFunctionType::Count), "max" => Some(AggregateFunctionType::Max), + "max_append_only" => Some(AggregateFunctionType::Max), "max_value" => Some(AggregateFunctionType::MaxValue), "min" => Some(AggregateFunctionType::Min), + "min_append_only" => Some(AggregateFunctionType::Min), "min_value" => Some(AggregateFunctionType::MinValue), "sum" => Some(AggregateFunctionType::Sum), _ => None, @@ -32,8 +36,10 @@ impl Display for AggregateFunctionType { AggregateFunctionType::Avg => f.write_str("AVG"), AggregateFunctionType::Count => f.write_str("COUNT"), AggregateFunctionType::Max => f.write_str("MAX"), + AggregateFunctionType::MaxAppendOnly => f.write_str("MAX_APPEND_ONLY"), AggregateFunctionType::MaxValue => f.write_str("MAX_VALUE"), AggregateFunctionType::Min => f.write_str("MIN"), + AggregateFunctionType::MinAppendOnly => f.write_str("MIN_APPEND_ONLY"), AggregateFunctionType::MinValue => f.write_str("MIN_VALUE"), AggregateFunctionType::Sum => f.write_str("SUM"), } diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 54e069a743..7f29d89416 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -721,8 +721,10 @@ fn get_aggregate_function_type( AggregateFunctionType::Avg => validate_avg(args, schema), AggregateFunctionType::Count => validate_count(args, schema), AggregateFunctionType::Max => validate_max(args, schema), + AggregateFunctionType::MaxAppendOnly => validate_max(args, schema), AggregateFunctionType::MaxValue => validate_max_value(args, schema), AggregateFunctionType::Min => validate_min(args, schema), + AggregateFunctionType::MinAppendOnly => validate_min(args, schema), AggregateFunctionType::MinValue => validate_min_value(args, schema), AggregateFunctionType::Sum => validate_sum(args, schema), } diff --git a/dozer-sql/src/aggregation/max_append_only.rs b/dozer-sql/src/aggregation/max_append_only.rs new file mode 100644 index 0000000000..d8660b87b7 --- /dev/null +++ b/dozer-sql/src/aggregation/max_append_only.rs @@ -0,0 +1,176 @@ +use crate::aggregation::aggregator::{Aggregator}; + +use crate::errors::{PipelineError, UnsupportedSqlError}; +use crate::{calculate_err_field}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MaxAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MaxAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MaxAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MaxAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_max = self.current_state.clone(); + + for val in new { + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u64::MIN, + _ => calculate_err_field!(cur_max.to_uint(), MaxAppendOnly, val), + }; + if new_val > max_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u128::MIN, + _ => calculate_err_field!(cur_max.to_u128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i64::MIN, + _ => calculate_err_field!(cur_max.to_int(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i128::MIN, + _ => calculate_err_field!(cur_max.to_i128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => f64::MIN, + _ => calculate_err_field!(cur_max.to_float(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => Decimal::MIN, + _ => calculate_err_field!(cur_max.to_decimal(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => DateTime::::from(DateTime::::MIN_UTC), + _ => calculate_err_field!(cur_max.to_timestamp(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => NaiveDate::MIN, + _ => calculate_err_field!(cur_max.to_date(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => { + DozerDuration(std::time::Duration::ZERO, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_max.to_duration(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MaxAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MaxAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/min_append_only.rs b/dozer-sql/src/aggregation/min_append_only.rs new file mode 100644 index 0000000000..d8b0f6618f --- /dev/null +++ b/dozer-sql/src/aggregation/min_append_only.rs @@ -0,0 +1,176 @@ +use crate::aggregation::aggregator::{Aggregator}; + +use crate::errors::{PipelineError, UnsupportedSqlError}; +use crate::{calculate_err_field}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MinAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MinAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MinAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MinAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_min = self.current_state.clone(); + + for val in new { + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u64::MAX, + _ => calculate_err_field!(cur_min.to_uint(), MinAppendOnly, val), + }; + if new_val < min_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u128::MAX, + _ => calculate_err_field!(cur_min.to_u128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i64::MAX, + _ => calculate_err_field!(cur_min.to_int(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i128::MAX, + _ => calculate_err_field!(cur_min.to_i128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => f64::MAX, + _ => calculate_err_field!(cur_min.to_float(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => Decimal::MAX, + _ => calculate_err_field!(cur_min.to_decimal(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => DateTime::::from(DateTime::::MAX_UTC), + _ => calculate_err_field!(cur_min.to_timestamp(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => NaiveDate::MAX, + _ => calculate_err_field!(cur_min.to_date(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => { + DozerDuration(std::time::Duration::MAX, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_min.to_duration(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MinAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MinAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/mod.rs b/dozer-sql/src/aggregation/mod.rs index 1feae0a3e4..7acf6db3db 100644 --- a/dozer-sql/src/aggregation/mod.rs +++ b/dozer-sql/src/aggregation/mod.rs @@ -9,3 +9,7 @@ pub mod min_value; pub mod processor; pub mod sum; mod tests; + +pub mod max_append_only; +pub mod max_value_append_only; +pub mod min_append_only; diff --git a/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs new file mode 100644 index 0000000000..1b13a1e37a --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs @@ -0,0 +1,607 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, + init_input_schema, init_processor, insert_exp, insert_field, update_exp, + DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_max_aggregation_float() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_uint() { + let schema = init_input_schema(UInt, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MAX_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MAX_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MAX_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int_null() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_float_null() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date_null() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs new file mode 100644 index 0000000000..95d76c2255 --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs @@ -0,0 +1,607 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, + init_input_schema, init_processor, insert_exp, insert_field, update_exp, + DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_min_aggregation_float() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_uint() { + let schema = init_input_schema(UInt, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MIN_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MIN_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MIN_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int_null() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_float_null() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date_null() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/mod.rs b/dozer-sql/src/aggregation/tests/mod.rs index 99436725e0..9d5eafa52c 100644 --- a/dozer-sql/src/aggregation/tests/mod.rs +++ b/dozer-sql/src/aggregation/tests/mod.rs @@ -20,3 +20,8 @@ mod aggregation_sum_tests; mod aggregation_test_planner; #[cfg(test)] mod aggregation_tests_utils; + +#[cfg(test)] +mod aggregation_max_append_only_tests; +#[cfg(test)] +mod aggregation_min_append_only_tests;