From 9baff0880b00fd26f316693a4a2fa09925d45ea6 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 18:44:54 +0800 Subject: [PATCH 01/13] fix: min, max with append only --- dozer-sql/expression/src/aggregate.rs | 6 + dozer-sql/expression/src/execution.rs | 2 + dozer-sql/src/aggregation/max_append_only.rs | 176 +++++ dozer-sql/src/aggregation/min_append_only.rs | 176 +++++ dozer-sql/src/aggregation/mod.rs | 4 + .../aggregation_max_append_only_tests.rs | 607 ++++++++++++++++++ .../aggregation_min_append_only_tests.rs | 607 ++++++++++++++++++ dozer-sql/src/aggregation/tests/mod.rs | 5 + 8 files changed, 1583 insertions(+) create mode 100644 dozer-sql/src/aggregation/max_append_only.rs create mode 100644 dozer-sql/src/aggregation/min_append_only.rs create mode 100644 dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs create mode 100644 dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs diff --git a/dozer-sql/expression/src/aggregate.rs b/dozer-sql/expression/src/aggregate.rs index d136358979..fd1dcf002c 100644 --- a/dozer-sql/expression/src/aggregate.rs +++ b/dozer-sql/expression/src/aggregate.rs @@ -5,8 +5,10 @@ pub enum AggregateFunctionType { Avg, Count, Max, + MaxAppendOnly, MaxValue, Min, + MinAppendOnly, MinValue, Sum, } @@ -17,8 +19,10 @@ impl AggregateFunctionType { "avg" => Some(AggregateFunctionType::Avg), "count" => Some(AggregateFunctionType::Count), "max" => Some(AggregateFunctionType::Max), + "max_append_only" => Some(AggregateFunctionType::Max), "max_value" => Some(AggregateFunctionType::MaxValue), "min" => Some(AggregateFunctionType::Min), + "min_append_only" => Some(AggregateFunctionType::Min), "min_value" => Some(AggregateFunctionType::MinValue), "sum" => Some(AggregateFunctionType::Sum), _ => None, @@ -32,8 +36,10 @@ impl Display for AggregateFunctionType { AggregateFunctionType::Avg => f.write_str("AVG"), AggregateFunctionType::Count => f.write_str("COUNT"), AggregateFunctionType::Max => f.write_str("MAX"), + AggregateFunctionType::MaxAppendOnly => f.write_str("MAX_APPEND_ONLY"), AggregateFunctionType::MaxValue => f.write_str("MAX_VALUE"), AggregateFunctionType::Min => f.write_str("MIN"), + AggregateFunctionType::MinAppendOnly => f.write_str("MIN_APPEND_ONLY"), AggregateFunctionType::MinValue => f.write_str("MIN_VALUE"), AggregateFunctionType::Sum => f.write_str("SUM"), } diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 54e069a743..7f29d89416 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -721,8 +721,10 @@ fn get_aggregate_function_type( AggregateFunctionType::Avg => validate_avg(args, schema), AggregateFunctionType::Count => validate_count(args, schema), AggregateFunctionType::Max => validate_max(args, schema), + AggregateFunctionType::MaxAppendOnly => validate_max(args, schema), AggregateFunctionType::MaxValue => validate_max_value(args, schema), AggregateFunctionType::Min => validate_min(args, schema), + AggregateFunctionType::MinAppendOnly => validate_min(args, schema), AggregateFunctionType::MinValue => validate_min_value(args, schema), AggregateFunctionType::Sum => validate_sum(args, schema), } diff --git a/dozer-sql/src/aggregation/max_append_only.rs b/dozer-sql/src/aggregation/max_append_only.rs new file mode 100644 index 0000000000..d8660b87b7 --- /dev/null +++ b/dozer-sql/src/aggregation/max_append_only.rs @@ -0,0 +1,176 @@ +use crate::aggregation::aggregator::{Aggregator}; + +use crate::errors::{PipelineError, UnsupportedSqlError}; +use crate::{calculate_err_field}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MaxAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MaxAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MaxAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MaxAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_max = self.current_state.clone(); + + for val in new { + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u64::MIN, + _ => calculate_err_field!(cur_max.to_uint(), MaxAppendOnly, val), + }; + if new_val > max_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => u128::MIN, + _ => calculate_err_field!(cur_max.to_u128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i64::MIN, + _ => calculate_err_field!(cur_max.to_int(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => i128::MIN, + _ => calculate_err_field!(cur_max.to_i128(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => f64::MIN, + _ => calculate_err_field!(cur_max.to_float(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => Decimal::MIN, + _ => calculate_err_field!(cur_max.to_decimal(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => DateTime::::from(DateTime::::MIN_UTC), + _ => calculate_err_field!(cur_max.to_timestamp(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => NaiveDate::MIN, + _ => calculate_err_field!(cur_max.to_date(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MaxAppendOnly, val); + let max_val = match cur_max { + Field::Null => { + DozerDuration(std::time::Duration::ZERO, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_max.to_duration(), MaxAppendOnly, val), + }; + + if new_val > max_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MaxAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MaxAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/min_append_only.rs b/dozer-sql/src/aggregation/min_append_only.rs new file mode 100644 index 0000000000..d8b0f6618f --- /dev/null +++ b/dozer-sql/src/aggregation/min_append_only.rs @@ -0,0 +1,176 @@ +use crate::aggregation::aggregator::{Aggregator}; + +use crate::errors::{PipelineError, UnsupportedSqlError}; +use crate::{calculate_err_field}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MinAppendOnly; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +pub struct MinAppendOnlyAggregator { + current_state: Field, + return_type: Option, +} + +impl MinAppendOnlyAggregator { + pub fn new() -> Self { + Self { + current_state: Field::Null, + return_type: None, + } + } + + pub fn update_state(&mut self, field: Field) { + self.current_state = field; + } +} + +impl Aggregator for MinAppendOnlyAggregator { + fn init(&mut self, return_type: FieldType) { + self.return_type = Some(return_type); + } + + fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn delete(&mut self, _old: &[Field]) -> Result { + Err(PipelineError::UnsupportedSqlError( + UnsupportedSqlError::GenericError("Append only".to_string()), + )) + } + + fn insert(&mut self, new: &[Field]) -> Result { + let cur_min = self.current_state.clone(); + + for val in new { + match self.return_type { + Some(typ) => match typ { + FieldType::UInt => { + let new_val = calculate_err_field!(val.to_uint(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u64::MAX, + _ => calculate_err_field!(cur_min.to_uint(), MinAppendOnly, val), + }; + if new_val < min_val { + self.update_state(Field::UInt(new_val)); + } + } + FieldType::U128 => { + let new_val = calculate_err_field!(val.to_u128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => u128::MAX, + _ => calculate_err_field!(cur_min.to_u128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::U128(new_val)); + } + } + FieldType::Int => { + let new_val = calculate_err_field!(val.to_int(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i64::MAX, + _ => calculate_err_field!(cur_min.to_int(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Int(new_val)); + } + } + FieldType::I128 => { + let new_val = calculate_err_field!(val.to_i128(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => i128::MAX, + _ => calculate_err_field!(cur_min.to_i128(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::I128(new_val)); + } + } + FieldType::Float => { + let new_val = calculate_err_field!(val.to_float(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => f64::MAX, + _ => calculate_err_field!(cur_min.to_float(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Float(OrderedFloat(new_val))); + } + } + FieldType::Decimal => { + let new_val = calculate_err_field!(val.to_decimal(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => Decimal::MAX, + _ => calculate_err_field!(cur_min.to_decimal(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Decimal(new_val)); + } + } + FieldType::Timestamp => { + let new_val = calculate_err_field!(val.to_timestamp(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => DateTime::::from(DateTime::::MAX_UTC), + _ => calculate_err_field!(cur_min.to_timestamp(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Timestamp(new_val)); + } + } + FieldType::Date => { + let new_val = calculate_err_field!(val.to_date(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => NaiveDate::MAX, + _ => calculate_err_field!(cur_min.to_date(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Date(new_val)); + } + } + FieldType::Duration => { + let new_val = calculate_err_field!(val.to_duration(), MinAppendOnly, val); + let min_val = match cur_min { + Field::Null => { + DozerDuration(std::time::Duration::MAX, TimeUnit::Nanoseconds) + } + _ => calculate_err_field!(cur_min.to_duration(), MinAppendOnly, val), + }; + + if new_val < min_val { + self.update_state(Field::Duration(new_val)); + } + } + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported return type {typ} for {MinAppendOnly}" + ))); + } + }, + None => { + return Err(PipelineError::InvalidReturnType(format!( + "Not supported None return type for {MinAppendOnly}" + ))) + } + } + } + Ok(self.current_state.clone()) + } +} diff --git a/dozer-sql/src/aggregation/mod.rs b/dozer-sql/src/aggregation/mod.rs index 1feae0a3e4..7acf6db3db 100644 --- a/dozer-sql/src/aggregation/mod.rs +++ b/dozer-sql/src/aggregation/mod.rs @@ -9,3 +9,7 @@ pub mod min_value; pub mod processor; pub mod sum; mod tests; + +pub mod max_append_only; +pub mod max_value_append_only; +pub mod min_append_only; diff --git a/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs new file mode 100644 index 0000000000..1b13a1e37a --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs @@ -0,0 +1,607 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, + init_input_schema, init_processor, insert_exp, insert_field, update_exp, + DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_max_aggregation_float() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_uint() { + let schema = init_input_schema(UInt, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MAX_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MAX_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MAX_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MAX_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_int_null() { + let schema = init_input_schema(Int, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_float_null() { + let schema = init_input_schema(Float, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MAX_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MAX_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_max_aggregation_date_null() { + let schema = init_input_schema(Date, "MAX_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MAX_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MAX_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MAX_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs new file mode 100644 index 0000000000..95d76c2255 --- /dev/null +++ b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs @@ -0,0 +1,607 @@ +use crate::aggregation::tests::aggregation_tests_utils::{ + get_date_field, get_decimal_field, get_duration_field, get_ts_field, + init_input_schema, init_processor, insert_exp, insert_field, update_exp, + DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, +}; +use crate::output; +use dozer_core::DEFAULT_PORT_HANDLE; + +use dozer_types::types::FieldType::{Date, Decimal, Duration, Float, Int, Timestamp, UInt}; +use std::collections::HashMap; + +#[test] +fn test_min_aggregation_float() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_FLOAT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_100_FLOAT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_FLOAT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_INT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_100_INT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_INT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_uint() { + let schema = init_input_schema(UInt, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, FIELD_100_UINT); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_UINT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_100_UINT)]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + --------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, FIELD_50_UINT); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_decimal_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_decimal_field(100), + &get_decimal_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_decimal_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + let mut inp = insert_field(ITALY, &get_duration_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_duration_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_duration_field(100), + &get_duration_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100.0 + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + + Singapore, 50.0 + ------------- + MIN_APPEND_ONLY = 50.0 + */ + inp = insert_field(SINGAPORE, &get_duration_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_duration_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 100 for segment Italy + /* + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + let mut inp = insert_field(ITALY, &get_ts_field(100)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_ts_field(100))]; + assert_eq!(out, exp); + + // Insert another 100 for segment Italy + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_ts_field(100), + &get_ts_field(100), + )]; + assert_eq!(out, exp); + + // Insert 50 for segment Singapore + /* + Italy, 100 + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + + Singapore, 50 + ------------- + MIN_APPEND_ONLY = 50 + */ + inp = insert_field(SINGAPORE, &get_ts_field(50)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_ts_field(50))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + ------------------ + MIN_APPEND_ONLY = 2015-10-08 + */ + let mut inp = insert_field(ITALY, &get_date_field(DATE8)); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, &get_date_field(DATE8))]; + assert_eq!(out, exp); + + // Insert another 2015-10-08 for segment Italy + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ----------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + &get_date_field(DATE8), + &get_date_field(DATE8), + )]; + assert_eq!(out, exp); + + // Insert 2015-10-04 for segment Singapore + /* + Italy, 2015-10-08 + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + + Singapore, 2015-10-04 + ------------- + MIN_APPEND_ONLY = 2015-10-04 + */ + inp = insert_field(SINGAPORE, &get_date_field(DATE4)); + out = output!(processor, inp); + exp = vec![insert_exp(SINGAPORE, &get_date_field(DATE4))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_int_null() { + let schema = init_input_schema(Int, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, FIELD_100_INT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_INT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_float_null() { + let schema = init_input_schema(Float, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, FIELD_100_FLOAT); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, FIELD_100_FLOAT)]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_decimal_null() { + let schema = init_input_schema(Decimal, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_decimal_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_decimal_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_duration_null() { + let schema = init_input_schema(Duration, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100.0 + ------------- + MIN_APPEND_ONLY = 100.0 + */ + inp = insert_field(ITALY, &get_duration_field(100)); + out = output!(processor, inp); + exp = vec![update_exp( + ITALY, + ITALY, + FIELD_NULL, + &get_duration_field(100), + )]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_timestamp_null() { + let schema = init_input_schema(Timestamp, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 100 for segment Italy + /* + Italy, NULL + Italy, 100 + ------------- + MIN_APPEND_ONLY = 100 + */ + inp = insert_field(ITALY, &get_ts_field(100)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_ts_field(100))]; + assert_eq!(out, exp); +} + +#[test] +fn test_min_aggregation_date_null() { + let schema = init_input_schema(Date, "MIN_APPEND_ONLY"); + let mut processor = init_processor( + "SELECT Country, MIN_APPEND_ONLY(Salary) \ + FROM Users \ + WHERE Salary >= 1 GROUP BY Country", + HashMap::from([(DEFAULT_PORT_HANDLE, schema)]), + ) + .unwrap(); + + // Insert NULL for segment Italy + /* + Italy, NULL + ------------- + MIN_APPEND_ONLY = NULL + */ + let mut inp = insert_field(ITALY, FIELD_NULL); + let mut out = output!(processor, inp); + let mut exp = vec![insert_exp(ITALY, FIELD_NULL)]; + assert_eq!(out, exp); + + // Insert 2015-10-08 for segment Italy + /* + Italy, NULL + Italy, 2015-10-08 + ------------- + MIN_APPEND_ONLY = 2015-10-08 + */ + inp = insert_field(ITALY, &get_date_field(DATE8)); + out = output!(processor, inp); + exp = vec![update_exp(ITALY, ITALY, FIELD_NULL, &get_date_field(DATE8))]; + assert_eq!(out, exp); +} diff --git a/dozer-sql/src/aggregation/tests/mod.rs b/dozer-sql/src/aggregation/tests/mod.rs index 99436725e0..9d5eafa52c 100644 --- a/dozer-sql/src/aggregation/tests/mod.rs +++ b/dozer-sql/src/aggregation/tests/mod.rs @@ -20,3 +20,8 @@ mod aggregation_sum_tests; mod aggregation_test_planner; #[cfg(test)] mod aggregation_tests_utils; + +#[cfg(test)] +mod aggregation_max_append_only_tests; +#[cfg(test)] +mod aggregation_min_append_only_tests; From 202f108aa25b47d39ec61f1cf8caa0ab41ee2190 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 18:45:45 +0800 Subject: [PATCH 02/13] fix: min, max with append only --- dozer-sql/src/aggregation/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/dozer-sql/src/aggregation/mod.rs b/dozer-sql/src/aggregation/mod.rs index 7acf6db3db..21bfebdfc8 100644 --- a/dozer-sql/src/aggregation/mod.rs +++ b/dozer-sql/src/aggregation/mod.rs @@ -11,5 +11,4 @@ pub mod sum; mod tests; pub mod max_append_only; -pub mod max_value_append_only; pub mod min_append_only; From e06948a6cfe49b8ee614384ae236377f89b3e9bb Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 19:12:12 +0800 Subject: [PATCH 03/13] fix: min, max with append only --- dozer-sql/src/aggregation/max_append_only.rs | 4 ++-- dozer-sql/src/aggregation/min_append_only.rs | 4 ++-- .../aggregation/tests/aggregation_max_append_only_tests.rs | 7 ++++--- .../aggregation/tests/aggregation_min_append_only_tests.rs | 7 ++++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/dozer-sql/src/aggregation/max_append_only.rs b/dozer-sql/src/aggregation/max_append_only.rs index d8660b87b7..0a3259bd7b 100644 --- a/dozer-sql/src/aggregation/max_append_only.rs +++ b/dozer-sql/src/aggregation/max_append_only.rs @@ -1,7 +1,7 @@ -use crate::aggregation::aggregator::{Aggregator}; +use crate::aggregation::aggregator::Aggregator; +use crate::calculate_err_field; use crate::errors::{PipelineError, UnsupportedSqlError}; -use crate::{calculate_err_field}; use dozer_sql_expression::aggregate::AggregateFunctionType::MaxAppendOnly; use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; diff --git a/dozer-sql/src/aggregation/min_append_only.rs b/dozer-sql/src/aggregation/min_append_only.rs index d8b0f6618f..f7c2fe3ea5 100644 --- a/dozer-sql/src/aggregation/min_append_only.rs +++ b/dozer-sql/src/aggregation/min_append_only.rs @@ -1,7 +1,7 @@ -use crate::aggregation::aggregator::{Aggregator}; +use crate::aggregation::aggregator::Aggregator; +use crate::calculate_err_field; use crate::errors::{PipelineError, UnsupportedSqlError}; -use crate::{calculate_err_field}; use dozer_sql_expression::aggregate::AggregateFunctionType::MinAppendOnly; use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc}; diff --git a/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs index 1b13a1e37a..d26fbf2004 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_max_append_only_tests.rs @@ -1,7 +1,8 @@ use crate::aggregation::tests::aggregation_tests_utils::{ - get_date_field, get_decimal_field, get_duration_field, get_ts_field, - init_input_schema, init_processor, insert_exp, insert_field, update_exp, - DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, + get_date_field, get_decimal_field, get_duration_field, get_ts_field, init_input_schema, + init_processor, insert_exp, insert_field, update_exp, DATE4, DATE8, FIELD_100_FLOAT, + FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, + SINGAPORE, }; use crate::output; use dozer_core::DEFAULT_PORT_HANDLE; diff --git a/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs index 95d76c2255..7b66d5cac2 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_min_append_only_tests.rs @@ -1,7 +1,8 @@ use crate::aggregation::tests::aggregation_tests_utils::{ - get_date_field, get_decimal_field, get_duration_field, get_ts_field, - init_input_schema, init_processor, insert_exp, insert_field, update_exp, - DATE4, DATE8, FIELD_100_FLOAT, FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, SINGAPORE, + get_date_field, get_decimal_field, get_duration_field, get_ts_field, init_input_schema, + init_processor, insert_exp, insert_field, update_exp, DATE4, DATE8, FIELD_100_FLOAT, + FIELD_100_INT, FIELD_100_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY, + SINGAPORE, }; use crate::output; use dozer_core::DEFAULT_PORT_HANDLE; From dd4d97eaf470fab70577fe1247bbef10f90923ef Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 20:09:10 +0800 Subject: [PATCH 04/13] fix: min, max with append only --- dozer-sql/src/aggregation/aggregator.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index 14fef33f84..8655fd5032 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -17,9 +17,11 @@ use dozer_sql_expression::execution::Expression; use crate::aggregation::max_value::MaxValueAggregator; use crate::aggregation::min_value::MinValueAggregator; use crate::errors::PipelineError::{InvalidFunctionArgument, InvalidValue}; -use dozer_sql_expression::aggregate::AggregateFunctionType::MaxValue; +use dozer_sql_expression::aggregate::AggregateFunctionType::{MaxAppendOnly, MaxValue, MinAppendOnly}; use dozer_types::types::{Field, FieldType, Schema}; use std::fmt::{Debug, Display, Formatter}; +use crate::aggregation::max_append_only::MaxAppendOnlyAggregator; +use crate::aggregation::min_append_only::MinAppendOnlyAggregator; #[enum_dispatch] pub trait Aggregator: Send + Sync + Serialize + DeserializeOwned { @@ -35,8 +37,10 @@ pub trait Aggregator: Send + Sync + Serialize + DeserializeOwned { pub enum AggregatorEnum { AvgAggregator, MinAggregator, + MinAppendOnlyAggregator, MinValueAggregator, MaxAggregator, + MaxAppendOnlyAggregator, MaxValueAggregator, SumAggregator, CountAggregator, @@ -47,8 +51,10 @@ pub enum AggregatorType { Avg, Count, Max, + MaxAppendOnly, MaxValue, Min, + MinAppendOnly, MinValue, Sum, } @@ -59,8 +65,10 @@ impl Display for AggregatorType { AggregatorType::Avg => f.write_str("avg"), AggregatorType::Count => f.write_str("count"), AggregatorType::Max => f.write_str("max"), + AggregatorType::MaxAppendOnly => f.write_str("max_append_only"), AggregatorType::MaxValue => f.write_str("max_value"), AggregatorType::Min => f.write_str("min"), + AggregatorType::MinAppendOnly => f.write_str("min_append_only"), AggregatorType::MinValue => f.write_str("min_value"), AggregatorType::Sum => f.write_str("sum"), } @@ -72,8 +80,10 @@ pub fn get_aggregator_from_aggregator_type(typ: AggregatorType) -> AggregatorEnu AggregatorType::Avg => AvgAggregator::new().into(), AggregatorType::Count => CountAggregator::new().into(), AggregatorType::Max => MaxAggregator::new().into(), + AggregatorType::MaxAppendOnly => MaxAppendOnlyAggregator::new().into(), AggregatorType::MaxValue => MaxValueAggregator::new().into(), AggregatorType::Min => MinAggregator::new().into(), + AggregatorType::MinAppendOnly => MinAppendOnlyAggregator::new().into(), AggregatorType::MinValue => MinValueAggregator::new().into(), AggregatorType::Sum => SumAggregator::new().into(), } From 082f696a4c3b058d4a8e0277e47352dbe0e02d38 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 20:40:39 +0800 Subject: [PATCH 05/13] fix: min, max with append only --- dozer-sql/src/aggregation/aggregator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index 8655fd5032..5ba3b0a8a8 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -17,7 +17,7 @@ use dozer_sql_expression::execution::Expression; use crate::aggregation::max_value::MaxValueAggregator; use crate::aggregation::min_value::MinValueAggregator; use crate::errors::PipelineError::{InvalidFunctionArgument, InvalidValue}; -use dozer_sql_expression::aggregate::AggregateFunctionType::{MaxAppendOnly, MaxValue, MinAppendOnly}; +use dozer_sql_expression::aggregate::AggregateFunctionType::{MaxValue}; use dozer_types::types::{Field, FieldType, Schema}; use std::fmt::{Debug, Display, Formatter}; use crate::aggregation::max_append_only::MaxAppendOnlyAggregator; From 3c6f472445116beff0cca542ca92ba8e5dcec90a Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 21:08:40 +0800 Subject: [PATCH 06/13] fix: min, max with append only --- dozer-sql/src/aggregation/aggregator.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index 5ba3b0a8a8..d294d2dc02 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -118,6 +118,18 @@ pub fn get_aggregator_type_from_aggregation_expression( .clone()], AggregatorType::Min, )), + Expression::AggregateFunction { + fun: AggregateFunctionType::MinAppendOnly, + args, + } => Ok(( + vec![args + .get(0) + .ok_or_else(|| { + PipelineError::NotEnoughArguments(AggregateFunctionType::MinAppendOnly.to_string()) + })? + .clone()], + AggregatorType::MinAppendOnly, + )), Expression::AggregateFunction { fun: AggregateFunctionType::Max, args, @@ -130,6 +142,18 @@ pub fn get_aggregator_type_from_aggregation_expression( .clone()], AggregatorType::Max, )), + Expression::AggregateFunction { + fun: AggregateFunctionType::MaxAppendOnly, + args, + } => Ok(( + vec![args + .get(0) + .ok_or_else(|| { + PipelineError::NotEnoughArguments(AggregateFunctionType::MaxAppendOnly.to_string()) + })? + .clone()], + AggregatorType::MaxAppendOnly, + )), Expression::AggregateFunction { fun: AggregateFunctionType::MaxValue, args, From eb8fef73e3c447087723ea35dc55769882dca1e4 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 21:29:00 +0800 Subject: [PATCH 07/13] fix: min, max with append only --- dozer-sql/src/aggregation/aggregator.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index d294d2dc02..bd85a1d0ad 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -14,14 +14,14 @@ use std::collections::BTreeMap; use dozer_sql_expression::aggregate::AggregateFunctionType; use dozer_sql_expression::execution::Expression; +use crate::aggregation::max_append_only::MaxAppendOnlyAggregator; use crate::aggregation::max_value::MaxValueAggregator; +use crate::aggregation::min_append_only::MinAppendOnlyAggregator; use crate::aggregation::min_value::MinValueAggregator; use crate::errors::PipelineError::{InvalidFunctionArgument, InvalidValue}; -use dozer_sql_expression::aggregate::AggregateFunctionType::{MaxValue}; +use dozer_sql_expression::aggregate::AggregateFunctionType::MaxValue; use dozer_types::types::{Field, FieldType, Schema}; use std::fmt::{Debug, Display, Formatter}; -use crate::aggregation::max_append_only::MaxAppendOnlyAggregator; -use crate::aggregation::min_append_only::MinAppendOnlyAggregator; #[enum_dispatch] pub trait Aggregator: Send + Sync + Serialize + DeserializeOwned { @@ -125,7 +125,9 @@ pub fn get_aggregator_type_from_aggregation_expression( vec![args .get(0) .ok_or_else(|| { - PipelineError::NotEnoughArguments(AggregateFunctionType::MinAppendOnly.to_string()) + PipelineError::NotEnoughArguments( + AggregateFunctionType::MinAppendOnly.to_string(), + ) })? .clone()], AggregatorType::MinAppendOnly, @@ -149,7 +151,9 @@ pub fn get_aggregator_type_from_aggregation_expression( vec![args .get(0) .ok_or_else(|| { - PipelineError::NotEnoughArguments(AggregateFunctionType::MaxAppendOnly.to_string()) + PipelineError::NotEnoughArguments( + AggregateFunctionType::MaxAppendOnly.to_string(), + ) })? .clone()], AggregatorType::MaxAppendOnly, From 96f30d48339a8338a6cfed0b0447e943cb286c3a Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 21:35:27 +0800 Subject: [PATCH 08/13] fix: min, max with append only --- dozer-sql/expression/src/aggregate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dozer-sql/expression/src/aggregate.rs b/dozer-sql/expression/src/aggregate.rs index fd1dcf002c..6e1e3d2a6e 100644 --- a/dozer-sql/expression/src/aggregate.rs +++ b/dozer-sql/expression/src/aggregate.rs @@ -19,10 +19,10 @@ impl AggregateFunctionType { "avg" => Some(AggregateFunctionType::Avg), "count" => Some(AggregateFunctionType::Count), "max" => Some(AggregateFunctionType::Max), - "max_append_only" => Some(AggregateFunctionType::Max), + "max_append_only" => Some(AggregateFunctionType::MaxAppendOnly), "max_value" => Some(AggregateFunctionType::MaxValue), "min" => Some(AggregateFunctionType::Min), - "min_append_only" => Some(AggregateFunctionType::Min), + "min_append_only" => Some(AggregateFunctionType::MinAppendOnly), "min_value" => Some(AggregateFunctionType::MinValue), "sum" => Some(AggregateFunctionType::Sum), _ => None, From d135f41741751efa8aaa57bbf91ca8b69139238a Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 22:49:20 +0800 Subject: [PATCH 09/13] fix: min, max with append only --- dozer-sql/expression/src/execution.rs | 99 +++++++++++++++++++- dozer-sql/src/aggregation/max_append_only.rs | 3 + dozer-sql/src/aggregation/min_append_only.rs | 3 + 3 files changed, 101 insertions(+), 4 deletions(-) diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 7f29d89416..8eddbde591 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -721,10 +721,10 @@ fn get_aggregate_function_type( AggregateFunctionType::Avg => validate_avg(args, schema), AggregateFunctionType::Count => validate_count(args, schema), AggregateFunctionType::Max => validate_max(args, schema), - AggregateFunctionType::MaxAppendOnly => validate_max(args, schema), + AggregateFunctionType::MaxAppendOnly => validate_max_append_only(args, schema), AggregateFunctionType::MaxValue => validate_max_value(args, schema), AggregateFunctionType::Min => validate_min(args, schema), - AggregateFunctionType::MinAppendOnly => validate_min(args, schema), + AggregateFunctionType::MinAppendOnly => validate_min_append_only(args, schema), AggregateFunctionType::MinValue => validate_min_value(args, schema), AggregateFunctionType::Sum => validate_sum(args, schema), } @@ -829,9 +829,9 @@ fn validate_max(args: &[Expression], schema: &Schema) -> Result Result { - let arg = validate_one_argument(args, schema, AggregateFunctionType::Min)?; + let (base_arg, arg) = validate_two_arguments(args, schema, AggregateFunctionType::Min)?; - let ret_type = match arg.return_type { + match base_arg.return_type { FieldType::UInt => FieldType::UInt, FieldType::U128 => FieldType::U128, FieldType::Int => FieldType::Int, @@ -850,6 +850,97 @@ fn validate_min(args: &[Expression], schema: &Schema) -> Result Result { + let arg = validate_one_argument(args, schema, AggregateFunctionType::MaxAppendOnly)?; + + let ret_type = match arg.return_type { + FieldType::UInt => FieldType::UInt, + FieldType::U128 => FieldType::U128, + FieldType::Int => FieldType::Int, + FieldType::I128 => FieldType::I128, + FieldType::Float => FieldType::Float, + FieldType::Decimal => FieldType::Decimal, + FieldType::Timestamp => FieldType::Timestamp, + FieldType::Date => FieldType::Date, + FieldType::Duration => FieldType::Duration, + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(Error::InvalidFunctionArgumentType { + function_name: AggregateFunctionType::MaxAppendOnly.to_string(), + argument_index: 0, + actual: arg.return_type, + expected: vec![ + FieldType::Decimal, + FieldType::UInt, + FieldType::U128, + FieldType::Int, + FieldType::I128, + FieldType::Float, + FieldType::Timestamp, + FieldType::Date, + FieldType::Duration, + ], + }); + } + }; + Ok(ExpressionType::new( + ret_type, + true, + SourceDefinition::Dynamic, + false, + )) +} + +fn validate_min_append_only(args: &[Expression], schema: &Schema) -> Result { + let arg = validate_one_argument(args, schema, AggregateFunctionType::MinAppendOnly)?; + + let ret_type = match arg.return_type { + FieldType::UInt => FieldType::UInt, + FieldType::U128 => FieldType::U128, + FieldType::Int => FieldType::Int, + FieldType::I128 => FieldType::I128, + FieldType::Float => FieldType::Float, + FieldType::Decimal => FieldType::Decimal, + FieldType::Timestamp => FieldType::Timestamp, + FieldType::Date => FieldType::Date, + FieldType::Duration => FieldType::Duration, + FieldType::Boolean + | FieldType::String + | FieldType::Text + | FieldType::Binary + | FieldType::Json + | FieldType::Point => { + return Err(Error::InvalidFunctionArgumentType { + function_name: AggregateFunctionType::MinAppendOnly.to_string(), + argument_index: 0, actual: arg.return_type, expected: vec![ FieldType::Decimal, diff --git a/dozer-sql/src/aggregation/max_append_only.rs b/dozer-sql/src/aggregation/max_append_only.rs index 0a3259bd7b..e17705f664 100644 --- a/dozer-sql/src/aggregation/max_append_only.rs +++ b/dozer-sql/src/aggregation/max_append_only.rs @@ -51,6 +51,9 @@ impl Aggregator for MaxAppendOnlyAggregator { let cur_max = self.current_state.clone(); for val in new { + if val == &Field::Null { + continue; + } match self.return_type { Some(typ) => match typ { FieldType::UInt => { diff --git a/dozer-sql/src/aggregation/min_append_only.rs b/dozer-sql/src/aggregation/min_append_only.rs index f7c2fe3ea5..952e00ca7d 100644 --- a/dozer-sql/src/aggregation/min_append_only.rs +++ b/dozer-sql/src/aggregation/min_append_only.rs @@ -51,6 +51,9 @@ impl Aggregator for MinAppendOnlyAggregator { let cur_min = self.current_state.clone(); for val in new { + if val == &Field::Null { + continue; + } match self.return_type { Some(typ) => match typ { FieldType::UInt => { From 579b43b2a80f4b16e802841a9d0bc8182b15b737 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 23:06:28 +0800 Subject: [PATCH 10/13] fix: min, max with append only --- dozer-sql/expression/src/execution.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 8eddbde591..6fb752cc34 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -829,9 +829,9 @@ fn validate_max(args: &[Expression], schema: &Schema) -> Result Result { - let (base_arg, arg) = validate_two_arguments(args, schema, AggregateFunctionType::Min)?; + let arg = validate_one_argument(args, schema, AggregateFunctionType::Min)?; - match base_arg.return_type { + let ret_type = match arg.return_type { FieldType::UInt => FieldType::UInt, FieldType::U128 => FieldType::U128, FieldType::Int => FieldType::Int, From de35b7be526fdc6c577c195eaaeda59c68c67010 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 23:11:05 +0800 Subject: [PATCH 11/13] fix: min, max with append only --- .../src/sql_tests/full/aggr_append_only.test | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 dozer-tests/src/sql_tests/full/aggr_append_only.test diff --git a/dozer-tests/src/sql_tests/full/aggr_append_only.test b/dozer-tests/src/sql_tests/full/aggr_append_only.test new file mode 100644 index 0000000000..9a87d0e90c --- /dev/null +++ b/dozer-tests/src/sql_tests/full/aggr_append_only.test @@ -0,0 +1,102 @@ +control sortmode rowsort + +statement ok +CREATE TABLE Users ( + id integer NOT NULL, + Name text NOT NULL, + City text, + Country text, + Salary numeric NOT NULL +) + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (1, 'John Smith', 'New York', 'USA', 50000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (2, 'Jane Doe', 'Los Angeles', 'USA', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (3, 'Mike Johnson', 'Chicago', 'USA', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (4, 'Karen Davis', 'Dallas', 'USA', 45000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (5, 'David Lee', 'Houston', 'USA', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (6, 'Sarah Johnson', 'Miami', 'USA', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (7, 'Tom Smith', 'Toronto', 'Canada', 80000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (8, 'Emily Davis', 'Seattle', 'USA', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (9, 'Kevin Kim', 'Vancouver', 'Canada', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (10, 'Maria Rodriguez', 'Mexico City', 'Mexico', 70000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (11, 'Hiroshi Yamamoto', 'Tokyo', 'Japan', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (12, 'Anna Nguyen', 'Sydney', 'Australia', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (13, 'Lee Min Ho', 'Seoul', 'South Korea', 85000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (14, 'Isabella Martinez', 'Madrid', 'Spain', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (15, 'Luca Rossi', 'Rome', 'Italy', 50000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (16, 'Sophie Dupont', 'Paris', 'France', 70000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (17, 'Miguel Hernandez', 'Barcelona', 'Spain', 75000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (18, 'Emma Smith', 'London', 'UK', 80000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (19, 'Sven Gustavsson', 'Stockholm', 'Sweden', 65000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (20, 'Leila Ahmed', 'Cairo', 'Egypt', 55000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (21, 'Javier Torres', 'Buenos Aires', 'Argentina', 60000); + +statement ok +INSERT INTO Users (id, Name, City, Country, Salary) VALUES (22, 'Hans Muller', 'Berlin', 'Germany', 70000); + + +query TR +SELECT Country, MAX_APPEND_ONLY(Salary) FROM Users GROUP BY Country; +---- +Canada 80000 +Egypt 55000 +France 70000 +Germany 70000 +South Korea 85000 +Spain 75000 +UK 80000 +USA 75000 + +query TR +SELECT Country, MIN_APPEND_ONLY(Salary) FROM Users WHERE Salary >= 1 GROUP BY Country; +---- +Canada 60000 +Egypt 55000 +France 70000 +Germany 70000 +South Korea 85000 +Spain 60000 +UK 80000 +USA 45000 + From 1b8ddc37de94cb3cc0efc5eff0496db2792a3542 Mon Sep 17 00:00:00 2001 From: Chloe Kim Date: Wed, 4 Oct 2023 23:19:46 +0800 Subject: [PATCH 12/13] fix: min, max with append only --- dozer-sql/expression/src/execution.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 6fb752cc34..a7a618277e 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -850,7 +850,7 @@ fn validate_min(args: &[Expression], schema: &Schema) -> Result Result Date: Thu, 5 Oct 2023 01:28:08 +0800 Subject: [PATCH 13/13] fix: min, max with append only --- dozer-tests/src/sql_tests/full/aggr_append_only.test | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dozer-tests/src/sql_tests/full/aggr_append_only.test b/dozer-tests/src/sql_tests/full/aggr_append_only.test index 9a87d0e90c..3f5e8b6aca 100644 --- a/dozer-tests/src/sql_tests/full/aggr_append_only.test +++ b/dozer-tests/src/sql_tests/full/aggr_append_only.test @@ -79,24 +79,36 @@ INSERT INTO Users (id, Name, City, Country, Salary) VALUES (22, 'Hans Muller', ' query TR SELECT Country, MAX_APPEND_ONLY(Salary) FROM Users GROUP BY Country; ---- +Argentina 60000 +Australia 65000 Canada 80000 Egypt 55000 France 70000 Germany 70000 +Italy 50000 +Japan 75000 +Mexico 70000 South Korea 85000 Spain 75000 +Sweden 65000 UK 80000 USA 75000 query TR SELECT Country, MIN_APPEND_ONLY(Salary) FROM Users WHERE Salary >= 1 GROUP BY Country; ---- +Argentina 60000 +Australia 65000 Canada 60000 Egypt 55000 France 70000 Germany 70000 +Italy 50000 +Japan 75000 +Mexico 70000 South Korea 85000 Spain 60000 +Sweden 65000 UK 80000 USA 45000