Skip to content

Commit

Permalink
fix: min, max with append only
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeminkyung committed Oct 4, 2023
1 parent c0d1a7b commit 9baff08
Show file tree
Hide file tree
Showing 8 changed files with 1,583 additions and 0 deletions.
6 changes: 6 additions & 0 deletions dozer-sql/expression/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ pub enum AggregateFunctionType {
Avg,
Count,
Max,
MaxAppendOnly,
MaxValue,
Min,
MinAppendOnly,
MinValue,
Sum,
}
Expand All @@ -17,8 +19,10 @@ impl AggregateFunctionType {
"avg" => Some(AggregateFunctionType::Avg),
"count" => Some(AggregateFunctionType::Count),
"max" => Some(AggregateFunctionType::Max),
"max_append_only" => Some(AggregateFunctionType::Max),
"max_value" => Some(AggregateFunctionType::MaxValue),
"min" => Some(AggregateFunctionType::Min),
"min_append_only" => Some(AggregateFunctionType::Min),
"min_value" => Some(AggregateFunctionType::MinValue),
"sum" => Some(AggregateFunctionType::Sum),
_ => None,
Expand All @@ -32,8 +36,10 @@ impl Display for AggregateFunctionType {
AggregateFunctionType::Avg => f.write_str("AVG"),
AggregateFunctionType::Count => f.write_str("COUNT"),
AggregateFunctionType::Max => f.write_str("MAX"),
AggregateFunctionType::MaxAppendOnly => f.write_str("MAX_APPEND_ONLY"),
AggregateFunctionType::MaxValue => f.write_str("MAX_VALUE"),
AggregateFunctionType::Min => f.write_str("MIN"),
AggregateFunctionType::MinAppendOnly => f.write_str("MIN_APPEND_ONLY"),
AggregateFunctionType::MinValue => f.write_str("MIN_VALUE"),
AggregateFunctionType::Sum => f.write_str("SUM"),
}
Expand Down
2 changes: 2 additions & 0 deletions dozer-sql/expression/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,10 @@ fn get_aggregate_function_type(
AggregateFunctionType::Avg => validate_avg(args, schema),
AggregateFunctionType::Count => validate_count(args, schema),
AggregateFunctionType::Max => validate_max(args, schema),
AggregateFunctionType::MaxAppendOnly => validate_max(args, schema),
AggregateFunctionType::MaxValue => validate_max_value(args, schema),
AggregateFunctionType::Min => validate_min(args, schema),
AggregateFunctionType::MinAppendOnly => validate_min(args, schema),
AggregateFunctionType::MinValue => validate_min_value(args, schema),
AggregateFunctionType::Sum => validate_sum(args, schema),
}
Expand Down
176 changes: 176 additions & 0 deletions dozer-sql/src/aggregation/max_append_only.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use crate::aggregation::aggregator::{Aggregator};

use crate::errors::{PipelineError, UnsupportedSqlError};
use crate::{calculate_err_field};
use dozer_sql_expression::aggregate::AggregateFunctionType::MaxAppendOnly;

use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc};
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::rust_decimal::Decimal;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit};

#[derive(Debug, Serialize, Deserialize)]
#[serde(crate = "dozer_types::serde")]
pub struct MaxAppendOnlyAggregator {
current_state: Field,
return_type: Option<FieldType>,
}

impl MaxAppendOnlyAggregator {
pub fn new() -> Self {
Self {
current_state: Field::Null,
return_type: None,
}
}

pub fn update_state(&mut self, field: Field) {
self.current_state = field;
}
}

impl Aggregator for MaxAppendOnlyAggregator {
fn init(&mut self, return_type: FieldType) {
self.return_type = Some(return_type);
}

fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result<Field, PipelineError> {
Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::GenericError("Append only".to_string()),
))
}

fn delete(&mut self, _old: &[Field]) -> Result<Field, PipelineError> {
Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::GenericError("Append only".to_string()),
))
}

fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
let cur_max = self.current_state.clone();

for val in new {
match self.return_type {
Some(typ) => match typ {
FieldType::UInt => {
let new_val = calculate_err_field!(val.to_uint(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => u64::MIN,
_ => calculate_err_field!(cur_max.to_uint(), MaxAppendOnly, val),
};
if new_val > max_val {
self.update_state(Field::UInt(new_val));
}
}
FieldType::U128 => {
let new_val = calculate_err_field!(val.to_u128(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => u128::MIN,
_ => calculate_err_field!(cur_max.to_u128(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::U128(new_val));
}
}
FieldType::Int => {
let new_val = calculate_err_field!(val.to_int(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => i64::MIN,
_ => calculate_err_field!(cur_max.to_int(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Int(new_val));
}
}
FieldType::I128 => {
let new_val = calculate_err_field!(val.to_i128(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => i128::MIN,
_ => calculate_err_field!(cur_max.to_i128(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::I128(new_val));
}
}
FieldType::Float => {
let new_val = calculate_err_field!(val.to_float(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => f64::MIN,
_ => calculate_err_field!(cur_max.to_float(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Float(OrderedFloat(new_val)));
}
}
FieldType::Decimal => {
let new_val = calculate_err_field!(val.to_decimal(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => Decimal::MIN,
_ => calculate_err_field!(cur_max.to_decimal(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Decimal(new_val));
}
}
FieldType::Timestamp => {
let new_val = calculate_err_field!(val.to_timestamp(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => DateTime::<FixedOffset>::from(DateTime::<Utc>::MIN_UTC),
_ => calculate_err_field!(cur_max.to_timestamp(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Timestamp(new_val));
}
}
FieldType::Date => {
let new_val = calculate_err_field!(val.to_date(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => NaiveDate::MIN,
_ => calculate_err_field!(cur_max.to_date(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Date(new_val));
}
}
FieldType::Duration => {
let new_val = calculate_err_field!(val.to_duration(), MaxAppendOnly, val);
let max_val = match cur_max {
Field::Null => {
DozerDuration(std::time::Duration::ZERO, TimeUnit::Nanoseconds)
}
_ => calculate_err_field!(cur_max.to_duration(), MaxAppendOnly, val),
};

if new_val > max_val {
self.update_state(Field::Duration(new_val));
}
}
FieldType::Boolean
| FieldType::String
| FieldType::Text
| FieldType::Binary
| FieldType::Json
| FieldType::Point => {
return Err(PipelineError::InvalidReturnType(format!(
"Not supported return type {typ} for {MaxAppendOnly}"
)));
}
},
None => {
return Err(PipelineError::InvalidReturnType(format!(
"Not supported None return type for {MaxAppendOnly}"
)))
}
}
}
Ok(self.current_state.clone())
}
}
176 changes: 176 additions & 0 deletions dozer-sql/src/aggregation/min_append_only.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use crate::aggregation::aggregator::{Aggregator};

use crate::errors::{PipelineError, UnsupportedSqlError};
use crate::{calculate_err_field};
use dozer_sql_expression::aggregate::AggregateFunctionType::MinAppendOnly;

use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Utc};
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::rust_decimal::Decimal;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::types::{DozerDuration, Field, FieldType, TimeUnit};

#[derive(Debug, Serialize, Deserialize)]
#[serde(crate = "dozer_types::serde")]
pub struct MinAppendOnlyAggregator {
current_state: Field,
return_type: Option<FieldType>,
}

impl MinAppendOnlyAggregator {
pub fn new() -> Self {
Self {
current_state: Field::Null,
return_type: None,
}
}

pub fn update_state(&mut self, field: Field) {
self.current_state = field;
}
}

impl Aggregator for MinAppendOnlyAggregator {
fn init(&mut self, return_type: FieldType) {
self.return_type = Some(return_type);
}

fn update(&mut self, _old: &[Field], _new: &[Field]) -> Result<Field, PipelineError> {
Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::GenericError("Append only".to_string()),
))
}

fn delete(&mut self, _old: &[Field]) -> Result<Field, PipelineError> {
Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::GenericError("Append only".to_string()),
))
}

fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
let cur_min = self.current_state.clone();

for val in new {
match self.return_type {
Some(typ) => match typ {
FieldType::UInt => {
let new_val = calculate_err_field!(val.to_uint(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => u64::MAX,
_ => calculate_err_field!(cur_min.to_uint(), MinAppendOnly, val),
};
if new_val < min_val {
self.update_state(Field::UInt(new_val));
}
}
FieldType::U128 => {
let new_val = calculate_err_field!(val.to_u128(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => u128::MAX,
_ => calculate_err_field!(cur_min.to_u128(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::U128(new_val));
}
}
FieldType::Int => {
let new_val = calculate_err_field!(val.to_int(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => i64::MAX,
_ => calculate_err_field!(cur_min.to_int(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Int(new_val));
}
}
FieldType::I128 => {
let new_val = calculate_err_field!(val.to_i128(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => i128::MAX,
_ => calculate_err_field!(cur_min.to_i128(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::I128(new_val));
}
}
FieldType::Float => {
let new_val = calculate_err_field!(val.to_float(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => f64::MAX,
_ => calculate_err_field!(cur_min.to_float(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Float(OrderedFloat(new_val)));
}
}
FieldType::Decimal => {
let new_val = calculate_err_field!(val.to_decimal(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => Decimal::MAX,
_ => calculate_err_field!(cur_min.to_decimal(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Decimal(new_val));
}
}
FieldType::Timestamp => {
let new_val = calculate_err_field!(val.to_timestamp(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => DateTime::<FixedOffset>::from(DateTime::<Utc>::MAX_UTC),
_ => calculate_err_field!(cur_min.to_timestamp(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Timestamp(new_val));
}
}
FieldType::Date => {
let new_val = calculate_err_field!(val.to_date(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => NaiveDate::MAX,
_ => calculate_err_field!(cur_min.to_date(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Date(new_val));
}
}
FieldType::Duration => {
let new_val = calculate_err_field!(val.to_duration(), MinAppendOnly, val);
let min_val = match cur_min {
Field::Null => {
DozerDuration(std::time::Duration::MAX, TimeUnit::Nanoseconds)
}
_ => calculate_err_field!(cur_min.to_duration(), MinAppendOnly, val),
};

if new_val < min_val {
self.update_state(Field::Duration(new_val));
}
}
FieldType::Boolean
| FieldType::String
| FieldType::Text
| FieldType::Binary
| FieldType::Json
| FieldType::Point => {
return Err(PipelineError::InvalidReturnType(format!(
"Not supported return type {typ} for {MinAppendOnly}"
)));
}
},
None => {
return Err(PipelineError::InvalidReturnType(format!(
"Not supported None return type for {MinAppendOnly}"
)))
}
}
}
Ok(self.current_state.clone())
}
}
Loading

0 comments on commit 9baff08

Please sign in to comment.