diff --git a/dozer-sql/expression/src/aggregate.rs b/dozer-sql/expression/src/aggregate.rs index d136358979..6e1e3d2a6e 100644 --- a/dozer-sql/expression/src/aggregate.rs +++ b/dozer-sql/expression/src/aggregate.rs @@ -5,8 +5,10 @@ pub enum AggregateFunctionType { Avg, Count, Max, + MaxAppendOnly, MaxValue, Min, + MinAppendOnly, MinValue, Sum, } @@ -17,8 +19,10 @@ impl AggregateFunctionType { "avg" => Some(AggregateFunctionType::Avg), "count" => Some(AggregateFunctionType::Count), "max" => Some(AggregateFunctionType::Max), + "max_append_only" => Some(AggregateFunctionType::MaxAppendOnly), "max_value" => Some(AggregateFunctionType::MaxValue), "min" => Some(AggregateFunctionType::Min), + "min_append_only" => Some(AggregateFunctionType::MinAppendOnly), "min_value" => Some(AggregateFunctionType::MinValue), "sum" => Some(AggregateFunctionType::Sum), _ => None, @@ -32,8 +36,10 @@ impl Display for AggregateFunctionType { AggregateFunctionType::Avg => f.write_str("AVG"), AggregateFunctionType::Count => f.write_str("COUNT"), AggregateFunctionType::Max => f.write_str("MAX"), + AggregateFunctionType::MaxAppendOnly => f.write_str("MAX_APPEND_ONLY"), AggregateFunctionType::MaxValue => f.write_str("MAX_VALUE"), AggregateFunctionType::Min => f.write_str("MIN"), + AggregateFunctionType::MinAppendOnly => f.write_str("MIN_APPEND_ONLY"), AggregateFunctionType::MinValue => f.write_str("MIN_VALUE"), AggregateFunctionType::Sum => f.write_str("SUM"), } diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 54e069a743..a7a618277e 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -721,8 +721,10 @@ fn get_aggregate_function_type( AggregateFunctionType::Avg => validate_avg(args, schema), AggregateFunctionType::Count => validate_count(args, schema), AggregateFunctionType::Max => validate_max(args, schema), + AggregateFunctionType::MaxAppendOnly => validate_max_append_only(args, schema), AggregateFunctionType::MaxValue => validate_max_value(args, schema), AggregateFunctionType::Min => validate_min(args, schema), + AggregateFunctionType::MinAppendOnly => validate_min_append_only(args, schema), AggregateFunctionType::MinValue => validate_min_value(args, schema), AggregateFunctionType::Sum => validate_sum(args, schema), } @@ -863,6 +865,97 @@ fn validate_min(args: &[Expression], schema: &Schema) -> Result Result { + let arg = validate_one_argument(args, schema, AggregateFunctionType::MaxAppendOnly)?; + + let ret_type = match arg.return_type { + FieldType::UInt => FieldType::UInt, + FieldType::U128 => FieldType::U128, + FieldType::Int => FieldType::Int, + FieldType::I128 => FieldType::I128, + FieldType::Float => FieldType::Float, + FieldType::Decimal => FieldType::Decimal, + FieldType::Timestamp => FieldType::Timestamp, + FieldType::Date => FieldType::Date, + FieldType::Duration => FieldType::Duration, + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(Error::InvalidFunctionArgumentType { + function_name: AggregateFunctionType::MaxAppendOnly.to_string(), + argument_index: 0, + actual: arg.return_type, + expected: vec![ + FieldType::Decimal, + FieldType::UInt, + FieldType::U128, + FieldType::Int, + FieldType::I128, + FieldType::Float, + FieldType::Timestamp, + FieldType::Date, + FieldType::Duration, + ], + }); + } + }; + Ok(ExpressionType::new( + ret_type, + true, + SourceDefinition::Dynamic, + false, + )) +} + +fn validate_min_append_only(args: &[Expression], schema: &Schema) -> Result { + let arg = validate_one_argument(args, schema, AggregateFunctionType::MinAppendOnly)?; + + let ret_type = match arg.return_type { + FieldType::UInt => FieldType::UInt, + FieldType::U128 => FieldType::U128, + FieldType::Int => FieldType::Int, + FieldType::I128 => FieldType::I128, + FieldType::Float => FieldType::Float, + FieldType::Decimal => FieldType::Decimal, + FieldType::Timestamp => FieldType::Timestamp, + FieldType::Date => FieldType::Date, + FieldType::Duration => FieldType::Duration, + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(Error::InvalidFunctionArgumentType { + function_name: AggregateFunctionType::MinAppendOnly.to_string(), + argument_index: 0, + actual: arg.return_type, + expected: vec![ + FieldType::Decimal, + FieldType::UInt, + FieldType::U128, + FieldType::Int, + FieldType::I128, + FieldType::Float, + FieldType::Timestamp, + FieldType::Date, + FieldType::Duration, + ], + }); + } + }; Ok(ExpressionType::new( ret_type, true, diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index 14fef33f84..bd85a1d0ad 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -14,7 +14,9 @@ use std::collections::BTreeMap; use dozer_sql_expression::aggregate::AggregateFunctionType; use dozer_sql_expression::execution::Expression; +use crate::aggregation::max_append_only::MaxAppendOnlyAggregator; use crate::aggregation::max_value::MaxValueAggregator; +use crate::aggregation::min_append_only::MinAppendOnlyAggregator; use crate::aggregation::min_value::MinValueAggregator; use crate::errors::PipelineError::{InvalidFunctionArgument, InvalidValue}; use dozer_sql_expression::aggregate::AggregateFunctionType::MaxValue; @@ -35,8 +37,10 @@ pub trait Aggregator: Send + Sync + Serialize + DeserializeOwned { pub enum AggregatorEnum { AvgAggregator, MinAggregator, + MinAppendOnlyAggregator, MinValueAggregator, MaxAggregator, + MaxAppendOnlyAggregator, MaxValueAggregator, SumAggregator, CountAggregator, @@ -47,8 +51,10 @@ pub enum AggregatorType { Avg, Count, Max, + MaxAppendOnly, MaxValue, Min, + MinAppendOnly, MinValue, Sum, } @@ -59,8 +65,10 @@ impl Display for AggregatorType { AggregatorType::Avg => f.write_str("avg"), AggregatorType::Count => f.write_str("count"), AggregatorType::Max => f.write_str("max"), + AggregatorType::MaxAppendOnly => f.write_str("max_append_only"), AggregatorType::MaxValue => f.write_str("max_value"), AggregatorType::Min => f.write_str("min"), + AggregatorType::MinAppendOnly => f.write_str("min_append_only"), AggregatorType::MinValue => f.write_str("min_value"), AggregatorType::Sum => f.write_str("sum"), } @@ -72,8 +80,10 @@ pub fn get_aggregator_from_aggregator_type(typ: AggregatorType) -> AggregatorEnu AggregatorType::Avg => AvgAggregator::new().into(), AggregatorType::Count => CountAggregator::new().into(), AggregatorType::Max => MaxAggregator::new().into(), + AggregatorType::MaxAppendOnly => MaxAppendOnlyAggregator::new().into(), AggregatorType::MaxValue => MaxValueAggregator::new().into(), AggregatorType::Min => MinAggregator::new().into(), + AggregatorType::MinAppendOnly => MinAppendOnlyAggregator::new().into(), AggregatorType::MinValue => MinValueAggregator::new().into(), AggregatorType::Sum => SumAggregator::new().into(), } @@ -108,6 +118,20 @@ pub fn get_aggregator_type_from_aggregation_expression( .clone()], AggregatorType::Min, )), + Expression::AggregateFunction { + fun: AggregateFunctionType::MinAppendOnly, + args, + } => Ok(( + vec![args + .get(0) + .ok_or_else(|| { + PipelineError::NotEnoughArguments( + AggregateFunctionType::MinAppendOnly.to_string(), + ) + })? + .clone()], + AggregatorType::MinAppendOnly, + )), Expression::AggregateFunction { fun: AggregateFunctionType::Max, args, @@ -120,6 +144,20 @@ pub fn get_aggregator_type_from_aggregation_expression( .clone()], AggregatorType::Max, )), + Expression::AggregateFunction { + fun: AggregateFunctionType::MaxAppendOnly, + args, + } => Ok(( + vec![args + .get(0) + .ok_or_else(|| { + PipelineError::NotEnoughArguments( + AggregateFunctionType::MaxAppendOnly.to_string(), + ) + })? + .clone()], + AggregatorType::MaxAppendOnly, + )), Expression::AggregateFunction { fun: AggregateFunctionType::MaxValue, args, diff --git a/dozer-sql/src/aggregation/max_append_only.rs b/dozer-sql/src/aggregation/max_append_only.rs new file mode 100644 index 0000000000..e17705f664 --- /dev/null +++ b/dozer-sql/src/aggregation/max_append_only.rs @@ -0,0 +1,179 @@ +use crate::aggregation::aggregator::Aggregator; + +use crate::calculate_err_field; +use crate::errors::{PipelineError, UnsupportedSqlError}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MaxAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MaxAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MaxAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MaxAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_max = self.current_state.clone(); + + for val in new { + if val == &Field::Null { + continue; + } + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u64::MIN, + _ => calculate_err_field!(cur_max.to_uint(), MaxAppendOnly, val), + }; + if new_val > max_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u128::MIN, + _ => calculate_err_field!(cur_max.to_u128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i64::MIN, + _ => calculate_err_field!(cur_max.to_int(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i128::MIN, + _ => calculate_err_field!(cur_max.to_i128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => f64::MIN, + _ => calculate_err_field!(cur_max.to_float(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => Decimal::MIN, + _ => calculate_err_field!(cur_max.to_decimal(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => DateTime::::from(DateTime::::MIN_UTC), + _ => calculate_err_field!(cur_max.to_timestamp(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => NaiveDate::MIN, + _ => calculate_err_field!(cur_max.to_date(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => { + DozerDuration(std::time::Duration::ZERO, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_max.to_duration(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MaxAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MaxAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/min_append_only.rs b/dozer-sql/src/aggregation/min_append_only.rs new file mode 100644 index 0000000000..952e00ca7d --- /dev/null +++ b/dozer-sql/src/aggregation/min_append_only.rs @@ -0,0 +1,179 @@ +use crate::aggregation::aggregator::Aggregator; + +use crate::calculate_err_field; +use crate::errors::{PipelineError, UnsupportedSqlError}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MinAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MinAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MinAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MinAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_min = self.current_state.clone(); + + for val in new { + if val == &Field::Null { + continue; + } + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u64::MAX, + _ => calculate_err_field!(cur_min.to_uint(), MinAppendOnly, val), + }; + if new_val < min_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u128::MAX, + _ => calculate_err_field!(cur_min.to_u128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i64::MAX, + _ => calculate_err_field!(cur_min.to_int(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i128::MAX, + _ => calculate_err_field!(cur_min.to_i128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => f64::MAX, + _ => calculate_err_field!(cur_min.to_float(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => Decimal::MAX, + _ => calculate_err_field!(cur_min.to_decimal(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => DateTime::::from(DateTime::::MAX_UTC), + _ => calculate_err_field!(cur_min.to_timestamp(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => NaiveDate::MAX, + _ => calculate_err_field!(cur_min.to_date(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => { + DozerDuration(std::time::Duration::MAX, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_min.to_duration(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MinAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MinAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/mod.rs b/dozer-sql/src/aggregation/mod.rs index 1feae0a3e4..21bfebdfc8 100644 --- a/dozer-sql/src/aggregation/mod.rs +++ b/dozer-sql/src/aggregation/mod.rs @@ -9,3 +9,6 @@ pub mod min_value; pub mod processor; pub mod sum; mod tests; + +pub mod max_append_only; +pub mod min_append_only; diff --git a/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs new file mode 100644 index 0000000000..d26fbf2004 --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs @@ -0,0 +1,608 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, init_input_schema, + init_processor, insert_exp, insert_field, update_exp, DATE4, DATE8, FIELD_100_FLOAT, + FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, + SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_max_aggregation_float() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_uint() { + let schema = init_input_schema(UInt, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MAX_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MAX_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MAX_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int_null() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_float_null() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date_null() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs new file mode 100644 index 0000000000..7b66d5cac2 --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs @@ -0,0 +1,608 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, init_input_schema, + init_processor, insert_exp, insert_field, update_exp, DATE4, DATE8, FIELD_100_FLOAT, + FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, + SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_min_aggregation_float() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_uint() { + let schema = init_input_schema(UInt, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MIN_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MIN_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MIN_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int_null() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_float_null() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date_null() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/mod.rs b/dozer-sql/src/aggregation/tests/mod.rs index 99436725e0..9d5eafa52c 100644 --- a/dozer-sql/src/aggregation/tests/mod.rs +++ b/dozer-sql/src/aggregation/tests/mod.rs @@ -20,3 +20,8 @@ mod aggregation_sum_tests; mod aggregation_test_planner; #[cfg(test)] mod aggregation_tests_utils; + +#[cfg(test)] +mod aggregation_max_append_only_tests; +#[cfg(test)] +mod aggregation_min_append_only_tests; diff --git a/dozer-tests/src/sql_tests/full/aggr_append_only.test b/dozer-tests/src/sql_tests/full/aggr_append_only.test new file mode 100644 index 0000000000..3f5e8b6aca --- /dev/null +++ b/dozer-tests/src/sql_tests/full/aggr_append_only.test @@ -0,0 +1,114 @@ +control sortmode rowsort + +statement ok +CREATE TABLE Users ( + id integer NOT NULL, + Name text NOT NULL, + City text, + Country text, + Salary numeric NOT NULL +) + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (1, 'John Smith', 'New York', 'USA', 50000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (2, 'Jane Doe', 'Los Angeles', 'USA', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (3, 'Mike Johnson', 'Chicago', 'USA', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (4, 'Karen Davis', 'Dallas', 'USA', 45000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (5, 'David Lee', 'Houston', 'USA', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (6, 'Sarah Johnson', 'Miami', 'USA', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (7, 'Tom Smith', 'Toronto', 'Canada', 80000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (8, 'Emily Davis', 'Seattle', 'USA', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (9, 'Kevin Kim', 'Vancouver', 'Canada', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (10, 'Maria Rodriguez', 'Mexico City', 'Mexico', 70000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (11, 'Hiroshi Yamamoto', 'Tokyo', 'Japan', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (12, 'Anna Nguyen', 'Sydney', 'Australia', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (13, 'Lee Min Ho', 'Seoul', 'South Korea', 85000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (14, 'Isabella Martinez', 'Madrid', 'Spain', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (15, 'Luca Rossi', 'Rome', 'Italy', 50000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (16, 'Sophie Dupont', 'Paris', 'France', 70000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (17, 'Miguel Hernandez', 'Barcelona', 'Spain', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (18, 'Emma Smith', 'London', 'UK', 80000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (19, 'Sven Gustavsson', 'Stockholm', 'Sweden', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (20, 'Leila Ahmed', 'Cairo', 'Egypt', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (21, 'Javier Torres', 'Buenos Aires', 'Argentina', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (22, 'Hans Muller', 'Berlin', 'Germany', 70000); + + +query TR +SELECT Country, MAX_APPEND_ONLY(Salary) FROM Users GROUP BY Country; +---- +Argentina 60000 +Australia 65000 +Canada 80000 +Egypt 55000 +France 70000 +Germany 70000 +Italy 50000 +Japan 75000 +Mexico 70000 +South Korea 85000 +Spain 75000 +Sweden 65000 +UK 80000 +USA 75000 + +query TR +SELECT Country, MIN_APPEND_ONLY(Salary) FROM Users WHERE Salary >= 1 GROUP BY Country; +---- +Argentina 60000 +Australia 65000 +Canada 60000 +Egypt 55000 +France 70000 +Germany 70000 +Italy 50000 +Japan 75000 +Mexico 70000 +South Korea 85000 +Spain 60000 +Sweden 65000 +UK 80000 +USA 45000 +