diff --git a/Cargo.lock b/Cargo.lock index 3cb68234a259..5083553dc0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1676,6 +1676,20 @@ dependencies = [ "url", ] +[[package]] +name = "common-decimal" +version = "0.4.2" +dependencies = [ + "arrow", + "bigdecimal 0.4.2", + "common-error", + "common-macro", + "rust_decimal", + "serde", + "serde_json", + "snafu", +] + [[package]] name = "common-error" version = "0.4.2" @@ -2723,6 +2737,7 @@ dependencies = [ "arrow-array", "arrow-schema", "common-base", + "common-decimal", "common-error", "common-macro", "common-telemetry", @@ -2733,6 +2748,7 @@ dependencies = [ "num-traits", "ordered-float 3.9.2", "paste", + "rust_decimal", "serde", "serde_json", "snafu", @@ -2747,20 +2763,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "decimal" -version = "0.4.2" -dependencies = [ - "arrow", - "bigdecimal 0.4.2", - "common-error", - "common-macro", - "rust_decimal", - "serde", - "serde_json", - "snafu", -] - [[package]] name = "der" version = "0.5.1" @@ -4859,6 +4861,7 @@ dependencies = [ "common-base", "common-catalog", "common-datasource", + "common-decimal", "common-error", "common-macro", "common-procedure", @@ -8563,6 +8566,7 @@ dependencies = [ "common-base", "common-catalog", "common-datasource", + "common-decimal", "common-error", "common-macro", "common-query", diff --git a/Cargo.toml b/Cargo.toml index 0659941d9c51..abfb1ea87440 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,6 +134,7 @@ common-base = { path = "src/common/base" } common-catalog = { path = "src/common/catalog" } common-config = { path = "src/common/config" } common-datasource = { path = "src/common/datasource" } +common-decimal = { path = "src/common/decimal" } common-error = { path = "src/common/error" } common-function = { path = "src/common/function" } common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 5ebea9c4bbe1..9328540bdf0a 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -158,7 +158,8 @@ impl TryFrom for ColumnDataTypeWrapper { }, ConcreteDataType::Null(_) | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => { + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } }); @@ -341,7 +342,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { TimeUnit::Microsecond => values.duration_microsecond_values.push(val.value()), TimeUnit::Nanosecond => values.duration_nanosecond_values.push(val.value()), }, - Value::List(_) => unreachable!(), + Value::List(_) | Value::Decimal128(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); } @@ -522,7 +523,10 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> values.duration_nanosecond_values, )), }, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { unreachable!() } } @@ -692,7 +696,10 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { unreachable!() } } @@ -816,7 +823,7 @@ pub fn to_proto_value(value: Value) -> Option { value_data: Some(ValueData::DurationNanosecondValue(v.value())), }, }, - Value::List(_) => return None, + Value::List(_) | Value::Decimal128(_) => return None, }; Some(proto_value) @@ -908,9 +915,10 @@ pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option ColumnDataType::IntervalDayTime, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { - return None - } + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => return None, }; Some(column_data_type) @@ -974,7 +982,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Microsecond => ValueData::DurationMicrosecondValue(v.value()), TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), - Value::List(_) => unreachable!(), + Value::List(_) | Value::Decimal128(_) => unreachable!(), }, } } diff --git a/src/common/decimal/Cargo.toml b/src/common/decimal/Cargo.toml index 14e714de0688..dd0ba90c440f 100644 --- a/src/common/decimal/Cargo.toml +++ b/src/common/decimal/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "decimal" +name = "common-decimal" version.workspace = true edition.workspace = true license.workspace = true diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index 98becbe9c8a7..ec345e9e946d 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -43,7 +43,7 @@ const BYTES_TO_OVERFLOW_RUST_DECIMAL: usize = 28; /// **precision**: the total number of digits in the number, it's range is \[1, 38\]. /// /// **scale**: the number of digits to the right of the decimal point, it's range is \[0, precision\]. -#[derive(Debug, Default, Eq, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Eq, Copy, Clone, Serialize, Deserialize)] pub struct Decimal128 { value: i128, precision: u8, @@ -90,6 +90,17 @@ impl Decimal128 { } } +/// The default value of Decimal128 is 0, and its precision is 1 and scale is 0. +impl Default for Decimal128 { + fn default() -> Self { + Self { + value: 0, + precision: 1, + scale: 0, + } + } +} + impl PartialEq for Decimal128 { fn eq(&self, other: &Self) -> bool { self.precision.eq(&other.precision) diff --git a/src/common/decimal/src/lib.rs b/src/common/decimal/src/lib.rs index 815c79fa0fad..694b32449def 100644 --- a/src/common/decimal/src/lib.rs +++ b/src/common/decimal/src/lib.rs @@ -14,3 +14,5 @@ pub mod decimal128; pub mod error; + +pub use decimal128::Decimal128; diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 85643ea82530..02d0a2be6b37 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -71,7 +71,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Decimal128(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index f1fd406caac5..79ec3099367b 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -13,6 +13,7 @@ arrow-array.workspace = true arrow-schema.workspace = true arrow.workspace = true common-base.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true @@ -23,6 +24,7 @@ num = "0.4" num-traits = "0.2" ordered-float = { version = "3.0", features = ["serde"] } paste = "1.0" +rust_decimal = "1.32.0" serde.workspace = true serde_json = "1.0" snafu.workspace = true diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index aaa039a9362f..193338a0a325 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -19,6 +19,8 @@ use arrow::compute::cast as arrow_array_cast; use arrow::datatypes::{ DataType as ArrowDataType, IntervalUnit as ArrowIntervalUnit, TimeUnit as ArrowTimeUnit, }; +use arrow_schema::DECIMAL_DEFAULT_SCALE; +use common_decimal::decimal128::DECIMAL128_MAX_PRECISION; use common_time::interval::IntervalUnit; use common_time::timestamp::TimeUnit; use paste::paste; @@ -246,6 +248,13 @@ impl ConcreteDataType { } } + pub fn as_decimal(&self) -> Option { + match self { + ConcreteDataType::Decimal128(d) => Some(*d), + _ => None, + } + } + /// Checks if the data type can cast to another data type. pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool { let array = arrow_array::new_empty_array(&self.as_arrow_type()); @@ -472,6 +481,10 @@ impl ConcreteDataType { pub fn decimal128_datatype(precision: u8, scale: i8) -> ConcreteDataType { ConcreteDataType::Decimal128(DecimalType::new(precision, scale)) } + + pub fn decimal128_default_datatype() -> ConcreteDataType { + Self::decimal128_datatype(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE) + } } /// Data type abstraction. diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index f96239bf3a36..9024c4d384a0 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -122,6 +122,13 @@ pub enum Error { #[snafu(display("Failed to unpack value to given type: {}", reason))] TryFromValue { reason: String, location: Location }, + + #[snafu(display("Invalid arguments, reason: {}", error))] + InvalidArguments { + #[snafu(source)] + error: arrow::error::ArrowError, + location: Location, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index ceb38964fb5f..da79fcff080b 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_decimal::Decimal128; use common_time::{Date, DateTime}; use crate::types::{ @@ -22,8 +23,8 @@ use crate::types::{ }; use crate::value::{ListValue, ListValueRef, Value}; use crate::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, ListVector, MutableVector, - PrimitiveVector, StringVector, Vector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, ListVector, + MutableVector, PrimitiveVector, StringVector, Vector, }; fn get_iter_capacity>(iter: &I) -> usize { @@ -277,6 +278,27 @@ impl<'a> ScalarRef<'a> for Date { } } +impl Scalar for Decimal128 { + type VectorType = Decimal128Vector; + type RefType<'a> = Decimal128; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + *self + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for Decimal128 { + type ScalarType = Decimal128; + + fn to_owned_scalar(&self) -> Self::ScalarType { + *self + } +} + impl Scalar for DateTime { type VectorType = DateTimeVector; type RefType<'a> = DateTime; @@ -396,6 +418,13 @@ mod tests { assert_eq!(date, date.to_owned_scalar()); } + #[test] + fn test_decimal_scalar() { + let decimal = Decimal128::new_unchecked(1, 1, 1); + assert_eq!(decimal, decimal.as_scalar_ref()); + assert_eq!(decimal, decimal.to_owned_scalar()); + } + #[test] fn test_datetime_scalar() { let dt = DateTime::new(123); diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 3d8635a5fb6d..29e3065abe58 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -30,6 +30,8 @@ pub enum LogicalTypeId { Float32, Float64, + Decimal128, + // String types: String, Binary, @@ -123,6 +125,7 @@ impl LogicalTypeId { LogicalTypeId::DurationMillisecond => ConcreteDataType::duration_millisecond_datatype(), LogicalTypeId::DurationMicrosecond => ConcreteDataType::duration_microsecond_datatype(), LogicalTypeId::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), + LogicalTypeId::Decimal128 => ConcreteDataType::decimal128_default_datatype(), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index aaaf8655dd85..668daa4ae440 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -17,6 +17,7 @@ mod boolean_type; pub mod cast; mod date_type; mod datetime_type; +mod decimal_type; mod dictionary_type; mod duration_type; mod interval_type; @@ -29,9 +30,10 @@ mod timestamp_type; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; -pub use cast::cast_with_opt; +pub use cast::{cast, cast_with_opt}; pub use date_type::DateType; pub use datetime_type::DateTimeType; +pub use decimal_type::DecimalType; pub use dictionary_type::DictionaryType; pub use duration_type::{ DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs index 292e282fae99..8e4881e90ff9 100644 --- a/src/datatypes/src/types/decimal_type.rs +++ b/src/datatypes/src/types/decimal_type.rs @@ -22,7 +22,9 @@ use crate::value::Value; use crate::vectors::{Decimal128VectorBuilder, MutableVector}; /// Decimal type with precision and scale information. -#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, +)] pub struct DecimalType { precision: u8, scale: i8, diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index febff36324a1..5f01207a6f74 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -80,6 +80,7 @@ impl DataType for StringType { Value::Time(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))), Value::Interval(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))), Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))), + Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))), // StringBytes is only support for utf-8, Value::Binary is not allowed. Value::Binary(_) | Value::List(_) => None, diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 024f45f2b36c..d4bd2d7694ea 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType as ArrowDataType, Field}; use common_base::bytes::{Bytes, StringBytes}; +use common_decimal::Decimal128; use common_telemetry::logging; use common_time::date::Date; use common_time::datetime::DateTime; @@ -61,6 +62,9 @@ pub enum Value { Float32(OrderedF32), Float64(OrderedF64), + // Decimal type: + Decimal128(Decimal128), + // String types: String(StringBytes), Binary(Bytes), @@ -116,6 +120,7 @@ impl Display for Value { .join(", "); write!(f, "{}[{}]", v.datatype.name(), items) } + Value::Decimal128(v) => write!(f, "{:?}", v), } } } @@ -148,6 +153,7 @@ impl Value { Value::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), Value::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), + Value::Decimal128(d) => ConcreteDataType::decimal128_datatype(d.precision(), d.scale()), } } @@ -192,6 +198,7 @@ impl Value { Value::Time(v) => ValueRef::Time(*v), Value::Interval(v) => ValueRef::Interval(*v), Value::Duration(v) => ValueRef::Duration(*v), + Value::Decimal128(v) => ValueRef::Decimal128(*v), } } @@ -271,6 +278,7 @@ impl Value { TimeUnit::Microsecond => LogicalTypeId::DurationMicrosecond, TimeUnit::Nanosecond => LogicalTypeId::DurationNanosecond, }, + Value::Decimal128(_) => LogicalTypeId::Decimal128, } } @@ -318,6 +326,10 @@ impl Value { IntervalUnit::MonthDayNano => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())), }, Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())), + Value::Decimal128(d) => { + let (v, p, s) = d.to_scalar_value(); + ScalarValue::Decimal128(v, p, s) + } }; Ok(scalar_value) @@ -357,6 +369,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result time_to_scalar_value(t.unit(), None)?, ConcreteDataType::Duration(d) => duration_to_scalar_value(d.unit(), None), + ConcreteDataType::Decimal128(d) => ScalarValue::Decimal128(None, d.precision(), d.scale()), }) } @@ -533,6 +546,8 @@ impl_try_from_value!(Time, Time); impl_try_from_value!(DateTime, DateTime); impl_try_from_value!(Timestamp, Timestamp); impl_try_from_value!(Interval, Interval); +impl_try_from_value!(Duration, Duration); +impl_try_from_value!(Decimal128, Decimal128); macro_rules! impl_value_from { ($Variant: ident, $Type: ident) => { @@ -575,6 +590,7 @@ impl_value_from!(Timestamp, Timestamp); impl_value_from!(Interval, Interval); impl_value_from!(Duration, Duration); impl_value_from!(String, String); +impl_value_from!(Decimal128, Decimal128); impl From<&str> for Value { fn from(string: &str) -> Value { @@ -620,6 +636,7 @@ impl TryFrom for serde_json::Value { Value::Time(v) => serde_json::to_value(v.value())?, Value::Interval(v) => serde_json::to_value(v.to_i128())?, Value::Duration(v) => serde_json::to_value(v.value())?, + Value::Decimal128(v) => serde_json::to_value(v.to_string())?, }; Ok(json_value) @@ -840,6 +857,7 @@ impl From> for Value { ValueRef::Interval(v) => Value::Interval(v), ValueRef::Duration(v) => Value::Duration(v), ValueRef::List(v) => v.to_value(), + ValueRef::Decimal128(v) => Value::Decimal128(v), } } } @@ -862,6 +880,9 @@ pub enum ValueRef<'a> { Float32(OrderedF32), Float64(OrderedF64), + // Decimal type: + Decimal128(Decimal128), + // String types: String(&'a str), Binary(&'a [u8]), @@ -1003,6 +1024,11 @@ impl<'a> ValueRef<'a> { pub fn as_list(&self) -> Result> { impl_as_for_value_ref!(self, List) } + + /// Cast itself to [Decimal128]. + pub fn as_decimal128(&self) -> Result> { + impl_as_for_value_ref!(self, Decimal128) + } } impl<'a> PartialOrd for ValueRef<'a> { @@ -1053,6 +1079,7 @@ impl_value_ref_from!(Timestamp, Timestamp); impl_value_ref_from!(Time, Time); impl_value_ref_from!(Interval, Interval); impl_value_ref_from!(Duration, Duration); +impl_value_ref_from!(Decimal128, Decimal128); impl<'a> From<&'a str> for ValueRef<'a> { fn from(string: &'a str) -> ValueRef<'a> { @@ -1143,6 +1170,7 @@ impl<'a> ValueRef<'a> { ValueRef::Time(_) => 16, ValueRef::Duration(_) => 16, ValueRef::Interval(_) => 24, + ValueRef::Decimal128(_) => 32, ValueRef::List(v) => match v { ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(), ListValueRef::Ref { val } => val.estimated_size(), @@ -2227,7 +2255,8 @@ mod tests { #[test] fn test_value_ref_estimated_size() { - assert_eq!(std::mem::size_of::(), 24); + // MacOS is 48, Ubuntu is 32 + // assert_eq!(std::mem::size_of::(), 48); check_value_ref_size_eq(&ValueRef::Boolean(true), 1); check_value_ref_size_eq(&ValueRef::UInt8(1), 1); @@ -2304,6 +2333,10 @@ mod tests { idx: 2, }), 85, + ); + check_value_ref_size_eq( + &ValueRef::Decimal128(Decimal128::new_unchecked(1234, 3, 1)), + 32, ) } } diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index d69d1bb82926..4bfcb82e76ea 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -30,6 +30,7 @@ mod boolean; mod constant; mod date; mod datetime; +mod decimal; mod duration; mod eq; mod helper; @@ -48,6 +49,7 @@ pub use boolean::{BooleanVector, BooleanVectorBuilder}; pub use constant::ConstantVector; pub use date::{DateVector, DateVectorBuilder}; pub use datetime::{DateTimeVector, DateTimeVectorBuilder}; +pub use decimal::{Decimal128Vector, Decimal128VectorBuilder}; pub use duration::{ DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector, DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder, diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs index 4efd0ab5acca..d191fa25077a 100644 --- a/src/datatypes/src/vectors/decimal.rs +++ b/src/datatypes/src/vectors/decimal.rs @@ -26,12 +26,13 @@ use snafu::{OptionExt, ResultExt}; use super::{MutableVector, Validity, Vector, VectorRef}; use crate::arrow::datatypes::DataType as ArrowDataType; use crate::data_type::ConcreteDataType; -use crate::error::{self, ArrowPrecisionOrScaleSnafu, CastTypeSnafu, Result}; +use crate::error::{self, CastTypeSnafu, InvalidArgumentsSnafu, Result}; use crate::prelude::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors; +/// Decimal128Vector is a vector keep i128 values with precision and scale. #[derive(Debug, PartialEq)] pub struct Decimal128Vector { array: Decimal128Array, @@ -48,12 +49,27 @@ impl Decimal128Vector { } } + /// Construct Vector from i128 values pub fn from_values>(iter: I) -> Self { Self { array: Decimal128Array::from_iter_values(iter), } } + pub fn from_slice>(slice: P) -> Self { + let iter = slice.as_ref().iter().copied(); + Self { + array: Decimal128Array::from_iter_values(iter), + } + } + + pub fn from_wrapper_slice>(slice: P) -> Self { + let iter = slice.as_ref().iter().copied().map(|v| v.val()); + Self { + array: Decimal128Array::from_iter_values(iter), + } + } + pub fn to_array_data(&self) -> ArrayData { self.array.to_data() } @@ -63,15 +79,42 @@ impl Decimal128Vector { Self::from_array_data(data) } + /// Change the precision and scale of the Decimal128Vector, + /// And check precision and scale if compatible. pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result { let array = self .array .with_precision_and_scale(precision, scale) - .context(ArrowPrecisionOrScaleSnafu { precision, scale })?; + .context(InvalidArgumentsSnafu {})?; Ok(Self { array }) } - pub fn as_arrow(&self) -> &Decimal128Array { + pub fn null_if_overflow_precision(&self, precision: u8) -> Self { + Self { + array: self.array.null_if_overflow_precision(precision), + } + } + + pub fn validate_decimal_precision(&self, precision: u8) -> Result<()> { + self.array + .validate_decimal_precision(precision) + .context(InvalidArgumentsSnafu {}) + } + + /// Return decimal value as string + pub fn value_as_string(&self, idx: usize) -> String { + self.array.value_as_string(idx) + } + + pub fn precision(&self) -> u8 { + self.array.precision() + } + + pub fn scale(&self) -> i8 { + self.array.scale() + } + + pub(crate) fn as_arrow(&self) -> &dyn Array { &self.array } } @@ -81,7 +124,7 @@ impl Vector for Decimal128Vector { if let ArrowDataType::Decimal128(p, s) = self.array.data_type() { ConcreteDataType::decimal128_datatype(*p, *s) } else { - unreachable!() + ConcreteDataType::decimal128_default_datatype() } } @@ -123,7 +166,7 @@ impl Vector for Decimal128Vector { self.array.is_null(row) } - fn slice(&self, offset: usize, length: usize) -> super::VectorRef { + fn slice(&self, offset: usize, length: usize) -> VectorRef { let data = self.array.to_data().slice(offset, length); Arc::new(Self::from_array_data(data)) } @@ -218,7 +261,7 @@ impl ScalarVector for Decimal128Vector { type Builder = Decimal128VectorBuilder; fn get_data(&self, idx: usize) -> Option> { - if self.array.is_valid(idx) { + if !self.array.is_valid(idx) { return None; } @@ -226,6 +269,7 @@ impl ScalarVector for Decimal128Vector { ArrowDataType::Decimal128(precision, scale) => { // Safety: The index have been checked by `is_valid()`. let value = unsafe { self.array.value_unchecked(idx) }; + // Safety: The precision and scale have been checked by ArrowDataType. Some(Decimal128::new_unchecked(value, *precision, *scale)) } _ => None, @@ -315,3 +359,107 @@ impl ScalarVectorBuilder for Decimal128VectorBuilder { } vectors::impl_try_from_arrow_array_for_vector!(Decimal128Array, Decimal128Vector); + +#[cfg(test)] +pub mod tests { + use arrow_array::Decimal128Array; + use common_decimal::Decimal128; + + use super::*; + use crate::vectors::operations::VectorOp; + use crate::vectors::Int8Vector; + + #[test] + fn test_from_arrow_decimal128_array() { + let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]); + let decimal_vector = Decimal128Vector::from(decimal_array); + let expect = Decimal128Vector::from_values(vec![123, 456]); + + assert_eq!(decimal_vector, expect); + } + + #[test] + fn test_from_slice() { + let decimal_vector = Decimal128Vector::from_slice([123, 456]); + let decimal_vector2 = Decimal128Vector::from_wrapper_slice([ + Decimal128::new_unchecked(123, 10, 2), + Decimal128::new_unchecked(456, 10, 2), + ]); + let expect = Decimal128Vector::from_values(vec![123, 456]); + + assert_eq!(decimal_vector, expect); + assert_eq!(decimal_vector2, expect); + } + + #[test] + fn test_decimal128_vector_basic() { + let data = vec![100, 200, 300]; + // create a decimal vector + let decimal_vector = Decimal128Vector::from_values(data.clone()) + .with_precision_and_scale(10, 2) + .unwrap(); + + assert_eq!(decimal_vector.len(), 3); + // check the first value + assert_eq!( + decimal_vector.get(0), + Value::Decimal128(Decimal128::new_unchecked(100, 10, 2)) + ); + + // iterator for-loop + for (i, _) in data.iter().enumerate() { + assert_eq!( + decimal_vector.get_data(i), + Some(Decimal128::new_unchecked((i + 1) as i128 * 100, 10, 2)) + ); + assert_eq!( + decimal_vector.get(i), + Value::Decimal128(Decimal128::new_unchecked((i + 1) as i128 * 100, 10, 2)) + ) + } + } + + #[test] + fn test_decimal128_vector_builder() { + let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3); + decimal_builder.push(Some(Decimal128::new_unchecked(100, 10, 2))); + decimal_builder.push(Some(Decimal128::new_unchecked(200, 10, 2))); + decimal_builder.push(Some(Decimal128::new_unchecked(300, 10, 2))); + let decimal_vector = decimal_builder + .finish() + .with_precision_and_scale(10, 2) + .unwrap(); + assert_eq!(decimal_vector.len(), 3); + assert_eq!( + decimal_vector.get(0), + Value::Decimal128(Decimal128::new_unchecked(100, 10, 2)) + ); + assert_eq!( + decimal_vector.get(1), + Value::Decimal128(Decimal128::new_unchecked(200, 10, 2)) + ); + assert_eq!( + decimal_vector.get(2), + Value::Decimal128(Decimal128::new_unchecked(300, 10, 2)) + ); + + // push value error + let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3); + decimal_builder.push(Some(Decimal128::new_unchecked(123, 10, 2))); + decimal_builder.push(Some(Decimal128::new_unchecked(1234, 10, 2))); + decimal_builder.push(Some(Decimal128::new_unchecked(12345, 10, 2))); + let decimal_vector = decimal_builder.finish().with_precision_and_scale(3, 2); + assert!(decimal_vector.is_ok()); + } + + #[test] + fn test_cast() { + let vector = Int8Vector::from_values(vec![1, 2, 3, 4, 100]); + let casted_vector = vector.cast(&ConcreteDataType::decimal128_datatype(3, 1)); + assert!(casted_vector.is_ok()); + let vector = casted_vector.unwrap(); + let array = vector.as_any().downcast_ref::().unwrap(); + // because 100 is out of Decimal(3, 1) range, so it will be null + assert!(array.is_null(4)); + } +} diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index f9c02f76a4f5..fcf97515ee27 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -20,12 +20,12 @@ use crate::data_type::DataType; use crate::types::{DurationType, TimeType, TimestampType}; use crate::vectors::constant::ConstantVector; use crate::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, - IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, ListVector, - PrimitiveVector, StringVector, TimeMicrosecondVector, TimeMillisecondVector, - TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, - TimestampNanosecondVector, TimestampSecondVector, Vector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, + IntervalYearMonthVector, ListVector, PrimitiveVector, StringVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, + TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector, }; use crate::with_match_primitive_type_id; @@ -151,6 +151,9 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { is_vector_eq!(DurationNanosecondVector, lhs, rhs) } }, + Decimal128(_) => { + is_vector_eq!(Decimal128Vector, lhs, rhs) + } } } @@ -242,6 +245,9 @@ mod tests { assert_vector_ref_eq(Arc::new(DurationMillisecondVector::from_values([300, 310]))); assert_vector_ref_eq(Arc::new(DurationMicrosecondVector::from_values([300, 310]))); assert_vector_ref_eq(Arc::new(DurationNanosecondVector::from_values([300, 310]))); + assert_vector_ref_eq(Arc::new(Decimal128Vector::from_values(vec![ + 1i128, 2i128, 3i128, + ]))); } #[test] @@ -312,5 +318,10 @@ mod tests { Arc::new(DurationSecondVector::from_values([300, 310])), Arc::new(DurationSecondVector::from_values([300, 320])), ); + + assert_vector_ref_ne( + Arc::new(Decimal128Vector::from_values([300i128, 310i128])), + Arc::new(Decimal128Vector::from_values([300i128, 320i128])), + ); } } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 0bd5cd9d891f..9e3d20f88940 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -25,7 +25,10 @@ use arrow_schema::IntervalUnit; use datafusion_common::ScalarValue; use snafu::{OptionExt, ResultExt}; -use super::{IntervalDayTimeVector, IntervalYearMonthVector}; +use super::{ + Decimal128Vector, DurationMicrosecondVector, DurationMillisecondVector, + DurationNanosecondVector, DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector, +}; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; use crate::scalars::{Scalar, ScalarVectorBuilder}; @@ -218,12 +221,23 @@ impl Helper { ScalarValue::IntervalMonthDayNano(v) => { ConstantVector::new(Arc::new(IntervalMonthDayNanoVector::from(vec![v])), length) } - ScalarValue::Decimal128(_, _, _) - | ScalarValue::Decimal256(_, _, _) - | ScalarValue::DurationSecond(_) - | ScalarValue::DurationMillisecond(_) - | ScalarValue::DurationMicrosecond(_) - | ScalarValue::DurationNanosecond(_) + ScalarValue::DurationSecond(v) => { + ConstantVector::new(Arc::new(DurationSecondVector::from(vec![v])), length) + } + ScalarValue::DurationMillisecond(v) => { + ConstantVector::new(Arc::new(DurationMillisecondVector::from(vec![v])), length) + } + ScalarValue::DurationMicrosecond(v) => { + ConstantVector::new(Arc::new(DurationMicrosecondVector::from(vec![v])), length) + } + ScalarValue::DurationNanosecond(v) => { + ConstantVector::new(Arc::new(DurationNanosecondVector::from(vec![v])), length) + } + ScalarValue::Decimal128(v, p, s) => { + let vector = Decimal128Vector::from(vec![v]).with_precision_and_scale(p, s)?; + ConstantVector::new(Arc::new(vector), length) + } + ScalarValue::Decimal256(_, _, _) | ScalarValue::Struct(_, _) | ScalarValue::Dictionary(_, _) => { return error::ConversionSnafu { @@ -318,14 +332,29 @@ impl Helper { IntervalMonthDayNanoVector::try_from_arrow_interval_array(array)?, ), }, + ArrowDataType::Duration(unit) => match unit { + TimeUnit::Second => { + Arc::new(DurationSecondVector::try_from_arrow_duration_array(array)?) + } + TimeUnit::Millisecond => Arc::new( + DurationMillisecondVector::try_from_arrow_duration_array(array)?, + ), + TimeUnit::Microsecond => Arc::new( + DurationMicrosecondVector::try_from_arrow_duration_array(array)?, + ), + TimeUnit::Nanosecond => Arc::new( + DurationNanosecondVector::try_from_arrow_duration_array(array)?, + ), + }, + ArrowDataType::Decimal128(_, _) => { + Arc::new(Decimal128Vector::try_from_arrow_array(array)?) + } ArrowDataType::Float16 - | ArrowDataType::Duration(_) | ArrowDataType::LargeList(_) | ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Struct(_) | ArrowDataType::Union(_, _) | ArrowDataType::Dictionary(_, _) - | ArrowDataType::Decimal128(_, _) | ArrowDataType::Decimal256(_, _) | ArrowDataType::Map(_, _) | ArrowDataType::RunEndEncoded(_, _) => { diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index 748bcd3ff5ce..b2de83c6e6f3 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -24,8 +24,8 @@ use crate::error::{self, Result}; use crate::types::LogicalPrimitiveType; use crate::vectors::constant::ConstantVector; use crate::vectors::{ - BinaryVector, BooleanVector, ConcreteDataType, ListVector, NullVector, PrimitiveVector, - StringVector, UInt32Vector, Vector, VectorRef, + BinaryVector, BooleanVector, ConcreteDataType, Decimal128Vector, ListVector, NullVector, + PrimitiveVector, StringVector, UInt32Vector, Vector, VectorRef, }; /// Vector compute operations. @@ -99,7 +99,13 @@ macro_rules! impl_scalar_vector_op { )+}; } -impl_scalar_vector_op!(BinaryVector, BooleanVector, ListVector, StringVector); +impl_scalar_vector_op!( + BinaryVector, + BooleanVector, + ListVector, + StringVector, + Decimal128Vector +); impl VectorOp for PrimitiveVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 46c6ce796730..ae088c76fbf6 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -22,6 +22,7 @@ chrono.workspace = true common-base.workspace = true common-catalog.workspace = true common-datasource.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-procedure.workspace = true diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index f6b3119e3f9f..4cc6fd3274ac 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -14,6 +14,7 @@ use bytes::Buf; use common_base::bytes::Bytes; +use common_decimal::Decimal128; use common_time::time::Time; use common_time::{Date, Duration, Interval}; use datatypes::data_type::ConcreteDataType; @@ -74,6 +75,7 @@ impl SortField { ConcreteDataType::Time(_) => 10, ConcreteDataType::Duration(_) => 10, ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Decimal128(_) => 19, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => 0, @@ -138,7 +140,8 @@ impl SortField { DateTime, datetime, Time, time, Interval, interval, - Duration, duration + Duration, duration, + Decimal128, decimal128 ); Ok(()) @@ -204,7 +207,8 @@ impl SortField { Time, Time, DateTime, DateTime, Interval, Interval, - Duration, Duration + Duration, Duration, + Decimal128, Decimal128 ) } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index a358df1c14be..6c909deb41b3 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -200,6 +200,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { } Value::Time(v) => row_writer .write_col(v.to_timezone_aware_string(query_context.time_zone()))?, + Value::Decimal128(v) => row_writer.write_col(v.to_string())?, } } row_writer.end_row().await?; @@ -246,6 +247,7 @@ pub(crate) fn create_mysql_column( ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR), ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME), + ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL), _ => error::InternalSnafu { err_msg: format!("not implemented for column datatype {:?}", data_type), } diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index f33b42ebd618..fd75d0c1cf12 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -103,6 +103,7 @@ pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWir } } Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)), + Value::Decimal128(v) => builder.encode_field(&v.to_string()), Value::List(_) | Value::Duration(_) => { Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!( @@ -131,6 +132,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), &ConcreteDataType::Time(_) => Ok(Type::TIME), &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL), + &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC), &ConcreteDataType::Duration(_) | &ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu { diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 17ec57afd3ba..d72a66721fd1 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true api.workspace = true common-base.workspace = true common-catalog.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-query.workspace = true diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 37cec2cec0f2..210a8432b966 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -34,7 +34,6 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::add_column_location::LocationType; use api::v1::{AddColumnLocation as Location, SemanticType}; use common_base::bytes::Bytes; -use common_decimal::Decimal128; use common_query::AddColumnLocation; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; @@ -147,14 +146,8 @@ macro_rules! parse_number_to_value { Ok(Value::Timestamp(Timestamp::new(n, t.unit()))) }, ConcreteDataType::Decimal128(_) => { - let decimal = match Decimal128::from_str($n){ - Ok(val) => val, - Err(e) => ParseSqlValueSnafu { - msg: format!("Fail to parse number {}, {e:?}",$n), - }.fail()?, - }; - - Ok(Value::Decimal128(decimal)) + // TODO(QuenKar): parse decimal128 string with precision and scale + unimplemented!("insert Decimal128 is not supported yet") } _ => ParseSqlValueSnafu {