diff --git a/Cargo.lock b/Cargo.lock index da0b43f05d94..278ed2f85772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,6 +199,7 @@ name = "api" version = "0.4.3" dependencies = [ "common-base", + "common-decimal", "common-error", "common-macro", "common-time", @@ -3536,7 +3537,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11efce55d8ce20257e08842e4f4c1c8fce2b3a8#a11efce55d8ce20257e08842e4f4c1c8fce2b3a8" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index ba46247cf922..4c3c78eef2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11efce55d8ce20257e08842e4f4c1c8fce2b3a8" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index b30989625f73..678b5321c00a 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -152,6 +152,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { .unwrap_or_default(), datatype: datatype.into(), semantic_type: semantic_type as i32, + ..Default::default() }; columns.push(column); } @@ -266,6 +267,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tpep_pickup_datetime".to_string(), @@ -274,6 +276,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tpep_dropoff_datetime".to_string(), @@ -282,6 +285,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "passenger_count".to_string(), @@ -290,6 +294,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "trip_distance".to_string(), @@ -298,6 +303,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "RatecodeID".to_string(), @@ -306,6 +312,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "store_and_fwd_flag".to_string(), @@ -314,6 +321,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "PULocationID".to_string(), @@ -322,6 +330,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "DOLocationID".to_string(), @@ -330,6 +339,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "payment_type".to_string(), @@ -338,6 +348,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "fare_amount".to_string(), @@ -346,6 +357,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "extra".to_string(), @@ -354,6 +366,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "mta_tax".to_string(), @@ -362,6 +375,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tip_amount".to_string(), @@ -370,6 +384,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tolls_amount".to_string(), @@ -378,6 +393,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "improvement_surcharge".to_string(), @@ -386,6 +402,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "total_amount".to_string(), @@ -394,6 +411,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "congestion_surcharge".to_string(), @@ -402,6 +420,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "airport_fee".to_string(), @@ -410,6 +429,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "tpep_pickup_datetime".to_string(), diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7c1ff3e04ba6..9beea1ff51b9 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -6,11 +6,13 @@ license.workspace = true [dependencies] common-base.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-time.workspace = true datatypes.workspace = true greptime-proto.workspace = true +paste = "1.0" prost.workspace = true snafu.workspace = true tonic.workspace = true diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 9328540bdf0a..40b8d1533125 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use common_base::BitVec; +use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION}; +use common_decimal::Decimal128; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::TimeUnit; @@ -26,47 +28,71 @@ use datatypes::types::{ }; use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, - Float64Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, - IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, - UInt64Vector, VectorRef, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, Float32Vector, Float64Vector, Int32Vector, Int64Vector, + IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, PrimitiveVector, + StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, + TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; +use greptime_proto::v1; +use greptime_proto::v1::column_data_type_extension::TypeExt; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; -use greptime_proto::v1::{self, DdlRequest, IntervalMonthDayNano, QueryRequest, Row, SemanticType}; +use greptime_proto::v1::{ + ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, SemanticType, +}; +use paste::paste; use snafu::prelude::*; use crate::error::{self, Result}; use crate::v1::column::Values; use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; -#[derive(Debug, PartialEq, Eq)] -pub struct ColumnDataTypeWrapper(ColumnDataType); +/// ColumnDataTypeWrapper is a wrapper of ColumnDataType and ColumnDataTypeExtension. +/// It could be used to convert with ConcreteDataType. +#[derive(Debug, PartialEq)] +pub struct ColumnDataTypeWrapper { + datatype: ColumnDataType, + datatype_ext: Option, +} impl ColumnDataTypeWrapper { - pub fn try_new(datatype: i32) -> Result { + /// Try to create a ColumnDataTypeWrapper from i32(ColumnDataType) and ColumnDataTypeExtension. + pub fn try_new(datatype: i32, datatype_ext: Option) -> Result { let datatype = ColumnDataType::try_from(datatype) .context(error::UnknownColumnDataTypeSnafu { datatype })?; - Ok(Self(datatype)) + Ok(Self { + datatype, + datatype_ext, + }) } - pub fn new(datatype: ColumnDataType) -> Self { - Self(datatype) + /// Create a ColumnDataTypeWrapper from ColumnDataType and ColumnDataTypeExtension. + pub fn new(datatype: ColumnDataType, datatype_ext: Option) -> Self { + Self { + datatype, + datatype_ext, + } } + /// Get the ColumnDataType. pub fn datatype(&self) -> ColumnDataType { - self.0 + self.datatype + } + + /// Get a tuple of ColumnDataType and ColumnDataTypeExtension. + pub fn to_parts(&self) -> (ColumnDataType, Option) { + (self.datatype, self.datatype_ext.clone()) } } impl From for ConcreteDataType { - fn from(datatype: ColumnDataTypeWrapper) -> Self { - match datatype.0 { + fn from(datatype_wrapper: ColumnDataTypeWrapper) -> Self { + match datatype_wrapper.datatype { ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(), ColumnDataType::Int8 => ConcreteDataType::int8_datatype(), ColumnDataType::Int16 => ConcreteDataType::int16_datatype(), @@ -109,6 +135,100 @@ impl From for ConcreteDataType { ConcreteDataType::duration_microsecond_datatype() } ColumnDataType::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), + ColumnDataType::Decimal128 => { + if let Some(TypeExt::DecimalType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + ConcreteDataType::decimal128_datatype(d.precision as u8, d.scale as i8) + } else { + ConcreteDataType::decimal128_default_datatype() + } + } + } + } +} + +/// This macro is used to generate datatype functions +/// with lower style for ColumnDataTypeWrapper. +/// +/// +/// For example: we can use `ColumnDataTypeWrapper::int8_datatype()`, +/// to get a ColumnDataTypeWrapper with datatype `ColumnDataType::Int8`. +macro_rules! impl_column_type_functions { + ($($Type: ident), +) => { + paste! { + impl ColumnDataTypeWrapper { + $( + pub fn [<$Type:lower _datatype>]() -> ColumnDataTypeWrapper { + ColumnDataTypeWrapper { + datatype: ColumnDataType::$Type, + datatype_ext: None, + } + } + )+ + } + } + } +} + +/// This macro is used to generate datatype functions +/// with snake style for ColumnDataTypeWrapper. +/// +/// +/// For example: we can use `ColumnDataTypeWrapper::duration_second_datatype()`, +/// to get a ColumnDataTypeWrapper with datatype `ColumnDataType::DurationSecond`. +macro_rules! impl_column_type_functions_with_snake { + ($($TypeName: ident), +) => { + paste!{ + impl ColumnDataTypeWrapper { + $( + pub fn [<$TypeName:snake _datatype>]() -> ColumnDataTypeWrapper { + ColumnDataTypeWrapper { + datatype: ColumnDataType::$TypeName, + datatype_ext: None, + } + } + )+ + } + } + }; +} + +impl_column_type_functions!( + Boolean, Uint8, Uint16, Uint32, Uint64, Int8, Int16, Int32, Int64, Float32, Float64, Binary, + Date, Datetime, String +); + +impl_column_type_functions_with_snake!( + TimestampSecond, + TimestampMillisecond, + TimestampMicrosecond, + TimestampNanosecond, + TimeSecond, + TimeMillisecond, + TimeMicrosecond, + TimeNanosecond, + IntervalYearMonth, + IntervalDayTime, + IntervalMonthDayNano, + DurationSecond, + DurationMillisecond, + DurationMicrosecond, + DurationNanosecond +); + +impl ColumnDataTypeWrapper { + pub fn decimal128_datatype(precision: i32, scale: i32) -> Self { + ColumnDataTypeWrapper { + datatype: ColumnDataType::Decimal128, + datatype_ext: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { + precision, + scale, + })), + }), } } } @@ -117,7 +237,7 @@ impl TryFrom for ColumnDataTypeWrapper { type Error = error::Error; fn try_from(datatype: ConcreteDataType) -> Result { - let datatype = ColumnDataTypeWrapper(match datatype { + let column_datatype = match datatype { ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, ConcreteDataType::Int8(_) => ColumnDataType::Int8, ConcreteDataType::Int16(_) => ColumnDataType::Int16, @@ -156,14 +276,30 @@ impl TryFrom for ColumnDataTypeWrapper { DurationType::Microsecond(_) => ColumnDataType::DurationMicrosecond, DurationType::Nanosecond(_) => ColumnDataType::DurationNanosecond, }, + ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, ConcreteDataType::Null(_) | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + | ConcreteDataType::Dictionary(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } - }); - Ok(datatype) + }; + let datatype_extension = match column_datatype { + ColumnDataType::Decimal128 => { + datatype + .as_decimal128() + .map(|decimal_type| ColumnDataTypeExtension { + type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { + precision: decimal_type.precision() as i32, + scale: decimal_type.scale() as i32, + })), + }) + } + _ => None, + }; + Ok(Self { + datatype: column_datatype, + datatype_ext: datatype_extension, + }) } } @@ -289,6 +425,10 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values duration_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::Decimal128 => Values { + decimal128_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } @@ -342,7 +482,8 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { TimeUnit::Microsecond => values.duration_microsecond_values.push(val.value()), TimeUnit::Nanosecond => values.duration_nanosecond_values.push(val.value()), }, - Value::List(_) | Value::Decimal128(_) => unreachable!(), + Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)), + Value::List(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); } @@ -382,17 +523,29 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { } /// Converts an i128 value to google protobuf type [IntervalMonthDayNano]. -pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano { +pub fn convert_i128_to_interval(v: i128) -> v1::IntervalMonthDayNano { let interval = Interval::from_i128(v); let (months, days, nanoseconds) = interval.to_month_day_nano(); - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months, days, nanoseconds, } } -pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { +/// Convert common decimal128 to grpc decimal128 without precision and scale. +pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 { + let value = v.val(); + v1::Decimal128 { + hi: (value >> 64) as i64, + lo: value as i64, + } +} + +pub fn pb_value_to_value_ref<'a>( + value: &'a v1::Value, + datatype_ext: &'a Option, +) -> ValueRef<'a> { let Some(value) = &value.value_data else { return ValueRef::Null; }; @@ -437,6 +590,28 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::DurationMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), ValueData::DurationMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), + ValueData::Decimal128Value(v) => { + // get precision and scale from datatype_extension + if let Some(TypeExt::DecimalType(d)) = datatype_ext + .as_ref() + .and_then(|column_ext| column_ext.type_ext.as_ref()) + { + ValueRef::Decimal128(Decimal128::from_value_precision_scale( + v.hi, + v.lo, + d.precision as u8, + d.scale as i8, + )) + } else { + // If the precision and scale are not set, use the default value. + ValueRef::Decimal128(Decimal128::from_value_precision_scale( + v.hi, + v.lo, + DECIMAL128_MAX_PRECISION, + DECIMAL128_DEFAULT_SCALE, + )) + } + } } } @@ -523,10 +698,12 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> values.duration_nanosecond_values, )), }, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + ConcreteDataType::Decimal128(d) => Arc::new(Decimal128Vector::from_values( + values.decimal128_values.iter().map(|x| { + Decimal128::from_value_precision_scale(x.hi, x.lo, d.precision(), d.scale()).into() + }), + )), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } } @@ -696,10 +873,19 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + ConcreteDataType::Decimal128(d) => values + .decimal128_values + .into_iter() + .map(|v| { + Value::Decimal128(Decimal128::from_value_precision_scale( + v.hi, + v.lo, + d.precision(), + d.scale(), + )) + }) + .collect(), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } } @@ -711,12 +897,14 @@ pub fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool } /// Returns true if the pb type value is valid. -pub fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { - let Ok(column_type) = ColumnDataType::try_from(type_value) else { - return false; - }; - - is_column_type_eq(column_type, expect_type) +pub fn is_column_type_value_eq( + type_value: i32, + type_extension: Option, + expect_type: &ConcreteDataType, +) -> bool { + ColumnDataTypeWrapper::try_new(type_value, type_extension) + .map(|wrapper| ConcreteDataType::from(wrapper) == *expect_type) + .unwrap_or(false) } /// Convert value into proto's value. @@ -823,13 +1011,19 @@ pub fn to_proto_value(value: Value) -> Option { value_data: Some(ValueData::DurationNanosecondValue(v.value())), }, }, - Value::List(_) | Value::Decimal128(_) => return None, + Value::Decimal128(v) => { + let (hi, lo) = v.split_value(); + v1::Value { + value_data: Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })), + } + } + Value::List(_) => return None, }; Some(proto_value) } -/// Returns the [ColumnDataType] of the value. +/// Returns the [ColumnDataTypeWrapper] of the value. /// /// If value is null, returns `None`. pub fn proto_value_type(value: &v1::Value) -> Option { @@ -864,66 +1058,11 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::DurationMillisecondValue(_) => ColumnDataType::DurationMillisecond, ValueData::DurationMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, ValueData::DurationNanosecondValue(_) => ColumnDataType::DurationNanosecond, + ValueData::Decimal128Value(_) => ColumnDataType::Decimal128, }; Some(value_type) } -/// Convert [ConcreteDataType] to [ColumnDataType]. -pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option { - let column_data_type = match data_type { - ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, - ConcreteDataType::Int8(_) => ColumnDataType::Int8, - ConcreteDataType::Int16(_) => ColumnDataType::Int16, - ConcreteDataType::Int32(_) => ColumnDataType::Int32, - ConcreteDataType::Int64(_) => ColumnDataType::Int64, - ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, - ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, - ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, - ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, - ConcreteDataType::Float32(_) => ColumnDataType::Float32, - ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) => ColumnDataType::Binary, - ConcreteDataType::String(_) => ColumnDataType::String, - ConcreteDataType::Date(_) => ColumnDataType::Date, - ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, - ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { - ColumnDataType::TimestampMillisecond - } - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { - ColumnDataType::TimestampMicrosecond - } - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { - ColumnDataType::TimestampNanosecond - } - ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, - ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, - ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, - ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, - ConcreteDataType::Duration(DurationType::Second(_)) => ColumnDataType::DurationSecond, - ConcreteDataType::Duration(DurationType::Millisecond(_)) => { - ColumnDataType::DurationMillisecond - } - ConcreteDataType::Duration(DurationType::Microsecond(_)) => { - ColumnDataType::DurationMicrosecond - } - ConcreteDataType::Duration(DurationType::Nanosecond(_)) => { - ColumnDataType::DurationNanosecond - } - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => ColumnDataType::IntervalYearMonth, - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { - ColumnDataType::IntervalMonthDayNano - } - ConcreteDataType::Interval(IntervalType::DayTime(_)) => ColumnDataType::IntervalDayTime, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => return None, - }; - - Some(column_data_type) -} - pub fn vectors_to_rows<'a>( columns: impl Iterator, row_count: usize, @@ -982,20 +1121,15 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Microsecond => ValueData::DurationMicrosecondValue(v.value()), TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), - Value::List(_) | Value::Decimal128(_) => unreachable!(), + Value::Decimal128(v) => { + let (hi, lo) = v.split_value(); + Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })) + } + Value::List(_) => unreachable!(), }, } } -/// Returns true if the column type is equal to expected type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - if let Some(expect) = to_column_data_type(expect_type) { - column_type == expect - } else { - false - } -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -1089,189 +1223,204 @@ mod tests { let values = values_with_capacity(ColumnDataType::DurationMillisecond, 2); let values = values.duration_millisecond_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::Decimal128, 2); + let values = values.decimal128_values; + assert_eq!(2, values.capacity()); } #[test] fn test_concrete_datatype_from_column_datatype() { assert_eq!( ConcreteDataType::boolean_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Boolean).into() + ColumnDataTypeWrapper::boolean_datatype().into() ); assert_eq!( ConcreteDataType::int8_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int8).into() + ColumnDataTypeWrapper::int8_datatype().into() ); assert_eq!( ConcreteDataType::int16_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int16).into() + ColumnDataTypeWrapper::int16_datatype().into() ); assert_eq!( ConcreteDataType::int32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int32).into() + ColumnDataTypeWrapper::int32_datatype().into() ); assert_eq!( ConcreteDataType::int64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int64).into() + ColumnDataTypeWrapper::int64_datatype().into() ); assert_eq!( ConcreteDataType::uint8_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint8).into() + ColumnDataTypeWrapper::uint8_datatype().into() ); assert_eq!( ConcreteDataType::uint16_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint16).into() + ColumnDataTypeWrapper::uint16_datatype().into() ); assert_eq!( ConcreteDataType::uint32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint32).into() + ColumnDataTypeWrapper::uint32_datatype().into() ); assert_eq!( ConcreteDataType::uint64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint64).into() + ColumnDataTypeWrapper::uint64_datatype().into() ); assert_eq!( ConcreteDataType::float32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Float32).into() + ColumnDataTypeWrapper::float32_datatype().into() ); assert_eq!( ConcreteDataType::float64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Float64).into() + ColumnDataTypeWrapper::float64_datatype().into() ); assert_eq!( ConcreteDataType::binary_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Binary).into() + ColumnDataTypeWrapper::binary_datatype().into() ); assert_eq!( ConcreteDataType::string_datatype(), - ColumnDataTypeWrapper(ColumnDataType::String).into() + ColumnDataTypeWrapper::string_datatype().into() ); assert_eq!( ConcreteDataType::date_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Date).into() + ColumnDataTypeWrapper::date_datatype().into() ); assert_eq!( ConcreteDataType::datetime_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Datetime).into() + ColumnDataTypeWrapper::datetime_datatype().into() ); assert_eq!( ConcreteDataType::timestamp_millisecond_datatype(), - ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into() + ColumnDataTypeWrapper::timestamp_millisecond_datatype().into() ); assert_eq!( ConcreteDataType::time_datatype(TimeUnit::Millisecond), - ColumnDataTypeWrapper(ColumnDataType::TimeMillisecond).into() + ColumnDataTypeWrapper::time_millisecond_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::DayTime), - ColumnDataTypeWrapper(ColumnDataType::IntervalDayTime).into() + ColumnDataTypeWrapper::interval_day_time_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::YearMonth), - ColumnDataTypeWrapper(ColumnDataType::IntervalYearMonth).into() + ColumnDataTypeWrapper::interval_year_month_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano), - ColumnDataTypeWrapper(ColumnDataType::IntervalMonthDayNano).into() + ColumnDataTypeWrapper::interval_month_day_nano_datatype().into() ); assert_eq!( ConcreteDataType::duration_millisecond_datatype(), - ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond).into() + ColumnDataTypeWrapper::duration_millisecond_datatype().into() + ); + assert_eq!( + ConcreteDataType::decimal128_datatype(10, 2), + ColumnDataTypeWrapper::decimal128_datatype(10, 2).into() ) } #[test] fn test_column_datatype_from_concrete_datatype() { assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Boolean), + ColumnDataTypeWrapper::boolean_datatype(), ConcreteDataType::boolean_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int8), + ColumnDataTypeWrapper::int8_datatype(), ConcreteDataType::int8_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int16), + ColumnDataTypeWrapper::int16_datatype(), ConcreteDataType::int16_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int32), + ColumnDataTypeWrapper::int32_datatype(), ConcreteDataType::int32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int64), + ColumnDataTypeWrapper::int64_datatype(), ConcreteDataType::int64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint8), + ColumnDataTypeWrapper::uint8_datatype(), ConcreteDataType::uint8_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint16), + ColumnDataTypeWrapper::uint16_datatype(), ConcreteDataType::uint16_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint32), + ColumnDataTypeWrapper::uint32_datatype(), ConcreteDataType::uint32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint64), + ColumnDataTypeWrapper::uint64_datatype(), ConcreteDataType::uint64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Float32), + ColumnDataTypeWrapper::float32_datatype(), ConcreteDataType::float32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Float64), + ColumnDataTypeWrapper::float64_datatype(), ConcreteDataType::float64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Binary), + ColumnDataTypeWrapper::binary_datatype(), ConcreteDataType::binary_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::String), + ColumnDataTypeWrapper::string_datatype(), ConcreteDataType::string_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Date), + ColumnDataTypeWrapper::date_datatype(), ConcreteDataType::date_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Datetime), + ColumnDataTypeWrapper::datetime_datatype(), ConcreteDataType::datetime_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond), + ColumnDataTypeWrapper::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype() .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalYearMonth), + ColumnDataTypeWrapper::interval_year_month_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::YearMonth) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalDayTime), + ColumnDataTypeWrapper::interval_day_time_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::DayTime) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalMonthDayNano), + ColumnDataTypeWrapper::interval_month_day_nano_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond), + ColumnDataTypeWrapper::duration_millisecond_datatype(), ConcreteDataType::duration_millisecond_datatype() .try_into() .unwrap() ); + assert_eq!( + ColumnDataTypeWrapper::decimal128_datatype(10, 2), + ConcreteDataType::decimal128_datatype(10, 2) + .try_into() + .unwrap() + ); + let result: Result = ConcreteDataType::null_datatype().try_into(); assert!(result.is_err()); assert_eq!( @@ -1298,6 +1447,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1339,6 +1489,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1380,6 +1531,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(IntervalYearMonthVector::from_vec(vec![1, 2, 3])); @@ -1424,6 +1576,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(DurationNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1468,6 +1621,7 @@ mod tests { }), null_mask: vec![2], datatype: ColumnDataType::Boolean as i32, + ..Default::default() }; let row_count = 4; @@ -1625,17 +1779,17 @@ mod tests { &ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)), Values { interval_month_day_nano_values: vec![ - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 1, days: 2, nanoseconds: 3, }, - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 5, days: 6, nanoseconds: 7, }, - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 9, days: 10, nanoseconds: 11, @@ -1867,4 +2021,33 @@ mod tests { assert_eq!(values[6], ValueData::DateValue(30)); assert_eq!(values[7], ValueData::StringValue("c".to_string())); } + + #[test] + fn test_is_column_type_value_eq() { + // test column type eq + let column1 = Column { + column_name: "test".to_string(), + semantic_type: 0, + values: Some(Values { + bool_values: vec![false, true, true], + ..Default::default() + }), + null_mask: vec![2], + datatype: ColumnDataType::Boolean as i32, + datatype_extension: None, + }; + assert!(is_column_type_value_eq( + column1.datatype, + column1.datatype_extension, + &ConcreteDataType::boolean_datatype(), + )); + } + + #[test] + fn test_convert_to_pb_decimal128() { + let decimal = Decimal128::new(123, 3, 1); + let pb_decimal = convert_to_pb_decimal128(decimal); + assert_eq!(pb_decimal.lo, 123); + assert_eq!(pb_decimal.hi, 0); + } } diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 7a812d005fae..4a077d3b5451 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -22,7 +22,10 @@ use crate::helper::ColumnDataTypeWrapper; use crate::v1::ColumnDef; pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { - let data_type = ColumnDataTypeWrapper::try_new(column_def.data_type)?; + let data_type = ColumnDataTypeWrapper::try_new( + column_def.data_type, + column_def.datatype_extension.clone(), + )?; let constraint = if column_def.default_constraint.is_empty() { None diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index ca7d3ae96ba3..ec4c11cdd9f9 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -46,6 +46,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "key".to_string(), @@ -54,6 +55,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "value".to_string(), @@ -62,6 +64,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "timestamp".to_string(), diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index c68039fb57a7..ce23fcf98a97 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -96,10 +96,25 @@ impl Decimal128 { self.scale } - /// Convert to ScalarValue + /// Convert to ScalarValue(value,precision,scale) pub fn to_scalar_value(&self) -> (Option, u8, i8) { (Some(self.value), self.precision, self.scale) } + + /// split the self.value(i128) to (high-64 bit, low-64 bit), and + /// the precision, scale information is discarded. + /// + /// Return: (high-64 bit, low-64 bit) + pub fn split_value(&self) -> (i64, i64) { + ((self.value >> 64) as i64, self.value as i64) + } + + /// Convert from precision, scale, a i128 value which + /// represents by two i64 value(high-64 bit, low-64 bit). + pub fn from_value_precision_scale(hi: i64, lo: i64, precision: u8, scale: i8) -> Self { + let value = (hi as i128) << 64 | lo as i128; + Self::new(value, precision, scale) + } } /// The default value of Decimal128 is 0, and its precision is 1 and scale is 0. diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 462eefde1a27..532f9cf15c48 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -158,6 +158,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: None, }], @@ -199,6 +200,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: Some(Location { location_type: LocationType::First.into(), @@ -213,6 +215,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: Some(Location { location_type: LocationType::After.into(), diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs index d272b6aa0bc0..ff737fcdfc60 100644 --- a/src/common/grpc-expr/src/delete.rs +++ b/src/common/grpc-expr/src/delete.rs @@ -36,14 +36,16 @@ pub fn to_table_delete_request( values, null_mask, datatype, + datatype_extension, .. } in request.key_columns { let Some(values) = values else { continue }; - let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) - .context(ColumnDataTypeSnafu)? - .into(); + let datatype: ConcreteDataType = + ColumnDataTypeWrapper::try_new(datatype, datatype_extension) + .context(ColumnDataTypeSnafu)? + .into(); let vector = add_values_to_builder(datatype, values, row_count, null_mask)?; ensure!( diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 5b173b6fdc67..746189ee2b53 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -119,7 +119,7 @@ mod tests { nullable: bool, ) -> error::Result { let datatype_wrapper = - ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?; + ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?; Ok(ColumnSchema::new( column_name, @@ -170,7 +170,8 @@ mod tests { .iter() .find(|c| c.name == "host") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -184,7 +185,8 @@ mod tests { .iter() .find(|c| c.name == "cpu") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -198,7 +200,8 @@ mod tests { .iter() .find(|c| c.name == "memory") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -212,7 +215,8 @@ mod tests { .iter() .find(|c| c.name == "time") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -226,7 +230,8 @@ mod tests { .iter() .find(|c| c.name == "interval") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -240,7 +245,8 @@ mod tests { .iter() .find(|c| c.name == "duration") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -254,7 +260,8 @@ mod tests { .iter() .find(|c| c.name == "ts") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -284,8 +291,11 @@ mod tests { assert_eq!( ConcreteDataType::string_datatype(), ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(host_column.column_def.as_ref().unwrap().data_type) - .unwrap() + ColumnDataTypeWrapper::try_new( + host_column.column_def.as_ref().unwrap().data_type, + None + ) + .unwrap() ) ); @@ -294,7 +304,8 @@ mod tests { ConcreteDataType::float64_datatype(), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - memory_column.column_def.as_ref().unwrap().data_type + memory_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -304,8 +315,11 @@ mod tests { assert_eq!( ConcreteDataType::time_datatype(TimeUnit::Millisecond), ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(time_column.column_def.as_ref().unwrap().data_type) - .unwrap() + ColumnDataTypeWrapper::try_new( + time_column.column_def.as_ref().unwrap().data_type, + None + ) + .unwrap() ) ); @@ -314,7 +328,8 @@ mod tests { ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - interval_column.column_def.as_ref().unwrap().data_type + interval_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -326,7 +341,8 @@ mod tests { ConcreteDataType::duration_millisecond_datatype(), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - duration_column.column_def.as_ref().unwrap().data_type + duration_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -360,6 +376,7 @@ mod tests { values: Some(host_vals), null_mask: vec![0], datatype: ColumnDataType::String as i32, + ..Default::default() }; let cpu_vals = Values { @@ -372,6 +389,7 @@ mod tests { values: Some(cpu_vals), null_mask: vec![2], datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let mem_vals = Values { @@ -384,6 +402,7 @@ mod tests { values: Some(mem_vals), null_mask: vec![1], datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let time_vals = Values { @@ -396,6 +415,7 @@ mod tests { values: Some(time_vals), null_mask: vec![0], datatype: ColumnDataType::TimeMillisecond as i32, + ..Default::default() }; let interval1 = IntervalMonthDayNano { @@ -418,6 +438,7 @@ mod tests { values: Some(interval_vals), null_mask: vec![0], datatype: ColumnDataType::IntervalMonthDayNano as i32, + ..Default::default() }; let duration_vals = Values { @@ -430,6 +451,7 @@ mod tests { values: Some(duration_vals), null_mask: vec![0], datatype: ColumnDataType::DurationMillisecond as i32, + ..Default::default() }; let ts_vals = Values { @@ -442,6 +464,7 @@ mod tests { values: Some(ts_vals), null_mask: vec![0], datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() }; ( diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 337a0a16e6e2..f7068bc875d8 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -121,6 +121,7 @@ pub fn build_create_table_expr( default_constraint: vec![], semantic_type, comment: String::new(), + ..Default::default() }; column_defs.push(column_def); } @@ -161,6 +162,7 @@ pub fn extract_new_columns( default_constraint: vec![], semantic_type: expr.semantic_type, comment: String::new(), + ..Default::default() }); AddColumn { column_def, diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 4c6e4a6af99c..6b53a5900c73 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::helper::convert_i128_to_interval; +use api::helper::{convert_i128_to_interval, convert_to_pb_decimal128}; use api::v1::column::Values; use common_base::BitVec; use datatypes::types::{DurationType, IntervalType, TimeType, TimestampType, WrapperType}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, - Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalDayTimeVector, - IntervalMonthDayNanoVector, IntervalYearMonthVector, StringVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, - UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, + Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, + StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, + TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, UInt32Vector, UInt64Vector, + UInt8Vector, VectorRef, }; use snafu::OptionExt; @@ -71,8 +72,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - // TODO(QuenKar): support gRPC for Decimal128 - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Decimal128(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } @@ -238,6 +238,12 @@ pub fn values(arrays: &[VectorRef]) -> Result { DurationNanosecondVector, duration_nanosecond_values, |x| { x.into_native() } + ), + ( + ConcreteDataType::Decimal128(_), + Decimal128Vector, + decimal128_values, + |x| { convert_to_pb_decimal128(x) } ) ) } @@ -315,6 +321,17 @@ mod tests { assert_eq!(vec![1, 2, 3], values.duration_second_values); } + #[test] + fn test_convert_arrow_array_decimal128() { + let array = Decimal128Vector::from(vec![Some(1), Some(2), None, Some(3)]); + + let vals = values(&[Arc::new(array)]).unwrap(); + (0..3).for_each(|i| { + assert_eq!(vals.decimal128_values[i].hi, 0); + assert_eq!(vals.decimal128_values[i].lo, i as i64 + 1); + }); + } + #[test] fn test_convert_arrow_arrays_string() { let array = StringVector::from(vec![ diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 7efa3dfd5cad..3a8e9238287e 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use api::helper::values_with_capacity; -use api::v1::{Column, ColumnDataType, SemanticType}; +use api::v1::{Column, ColumnDataType, ColumnDataTypeExtension, SemanticType}; use common_base::BitVec; use common_time::timestamp::TimeUnit; use snafu::ensure; @@ -50,6 +50,7 @@ impl LinesWriter { column_name, ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, + None, ); ensure!( column.datatype == ColumnDataType::TimestampMillisecond as i32, @@ -69,7 +70,8 @@ impl LinesWriter { } pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag); + let (idx, column) = + self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag, None); ensure!( column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { @@ -86,8 +88,12 @@ impl LinesWriter { } pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Uint64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Uint64 as i32, TypeMismatchSnafu { @@ -104,8 +110,12 @@ impl LinesWriter { } pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Int64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Int64 as i32, TypeMismatchSnafu { @@ -122,8 +132,12 @@ impl LinesWriter { } pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Float64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Float64 as i32, TypeMismatchSnafu { @@ -140,8 +154,12 @@ impl LinesWriter { } pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::String, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::String, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { @@ -158,8 +176,12 @@ impl LinesWriter { } pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Boolean, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Boolean as i32, TypeMismatchSnafu { @@ -201,6 +223,7 @@ impl LinesWriter { column_name: &str, datatype: ColumnDataType, semantic_type: SemanticType, + datatype_extension: Option, ) -> (usize, &mut Column) { let column_names = &mut self.column_name_index; let column_idx = match column_names.get(column_name) { @@ -218,6 +241,7 @@ impl LinesWriter { values: Some(values_with_capacity(datatype, to_insert)), datatype: datatype as i32, null_mask: Vec::default(), + datatype_extension, }); let _ = column_names.insert(column_name.to_string(), new_idx); new_idx diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 54632e9b3f05..85b4bb8d23da 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -132,6 +132,7 @@ impl CreateTableProcedure { default_constraint: c.default_constraint.clone(), semantic_type: semantic_type as i32, comment: String::new(), + datatype_extension: c.datatype_extension.clone(), }), column_id: i as u32, } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 877bf47e3bf5..61470eec6bd7 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -142,6 +142,7 @@ impl ConcreteDataType { | ConcreteDataType::Time(_) | ConcreteDataType::Interval(_) | ConcreteDataType::Duration(_) + | ConcreteDataType::Decimal128(_) ) } @@ -676,6 +677,7 @@ mod tests { assert!(ConcreteDataType::duration_millisecond_datatype().is_stringifiable()); assert!(ConcreteDataType::duration_microsecond_datatype().is_stringifiable()); assert!(ConcreteDataType::duration_nanosecond_datatype().is_stringifiable()); + assert!(ConcreteDataType::decimal128_datatype(10, 2).is_stringifiable()); } #[test] diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs index edda8fe9f7eb..48ede0c44136 100644 --- a/src/datatypes/src/types/decimal_type.rs +++ b/src/datatypes/src/types/decimal_type.rs @@ -13,6 +13,7 @@ // limitations under the License. use arrow_schema::DataType as ArrowDataType; +use common_decimal::decimal128::DECIMAL128_MAX_PRECISION; use common_decimal::Decimal128; use serde::{Deserialize, Serialize}; @@ -32,7 +33,17 @@ pub struct Decimal128Type { impl Decimal128Type { pub fn new(precision: u8, scale: i8) -> Self { - Self { precision, scale } + // assert precision and scale is valid + assert!( + precision > 0 && precision <= DECIMAL128_MAX_PRECISION, + "precision should be in [1, {}]", + DECIMAL128_MAX_PRECISION + ); + assert!( + scale >= 0 && scale <= precision as i8, + "scale should be in [0, precision]" + ); + Decimal128Type { precision, scale } } pub fn precision(&self) -> u8 { @@ -46,7 +57,8 @@ impl Decimal128Type { impl DataType for Decimal128Type { fn name(&self) -> &str { - "decimal128" + // TODO(QuenKar): support precision and scale information in name + "decimal" } fn logical_type_id(&self) -> LogicalTypeId { @@ -62,7 +74,12 @@ impl DataType for Decimal128Type { } fn create_mutable_vector(&self, capacity: usize) -> Box { - Box::new(Decimal128VectorBuilder::with_capacity(capacity)) + Box::new( + Decimal128VectorBuilder::with_capacity(capacity) + .with_precision_and_scale(self.precision, self.scale) + // safe to unwrap because we have validated the precision and scale in new() + .unwrap(), + ) } fn try_cast(&self, val: Value) -> Option { diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs index 1303303d7b66..ebcdb43d6e34 100644 --- a/src/datatypes/src/vectors/decimal.rs +++ b/src/datatypes/src/vectors/decimal.rs @@ -392,7 +392,26 @@ pub mod tests { let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]); let decimal_vector = Decimal128Vector::from(decimal_array); let expect = Decimal128Vector::from_values(vec![123, 456]); + assert_eq!(decimal_vector, expect); + + let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]) + .with_precision_and_scale(10, 2) + .unwrap(); + let decimal_vector = Decimal128Vector::from(decimal_array); + let expect = Decimal128Vector::from_values(vec![123, 456]) + .with_precision_and_scale(10, 2) + .unwrap(); + assert_eq!(decimal_vector, expect); + let decimal_array: ArrayRef = Arc::new( + Decimal128Array::from(vec![Some(123), Some(456)]) + .with_precision_and_scale(3, 2) + .unwrap(), + ); + let decimal_vector = Decimal128Vector::try_from_arrow_array(decimal_array).unwrap(); + let expect = Decimal128Vector::from_values(vec![123, 456]) + .with_precision_and_scale(3, 2) + .unwrap(); assert_eq!(decimal_vector, expect); } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index d9ca7a7d31bc..49d09a2ad3fc 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -54,6 +54,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_tag1".to_string(), @@ -62,6 +63,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_tag2".to_string(), @@ -70,6 +72,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_field_column".to_string(), @@ -78,6 +81,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "ts".to_string(), @@ -114,6 +118,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }), column_id: 0, }, @@ -125,6 +130,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), column_id: 1, }, @@ -136,6 +142,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), column_id: 2, }, @@ -147,6 +154,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), column_id: 3, }, @@ -287,6 +295,7 @@ fn test_create_alter_region_request() { default_constraint: b"hello".to_vec(), semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), location: Some(AddColumnLocation { location_type: LocationType::After as i32, @@ -321,7 +330,8 @@ fn test_create_alter_region_request() { is_nullable: true, default_constraint: b"hello".to_vec(), semantic_type: SemanticType::Tag as i32, - comment: String::new() + comment: String::new(), + ..Default::default() }), column_id: 3, }), diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index c5520cd84f01..8baa37a7cfbc 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -15,9 +15,8 @@ use std::hash::{BuildHasher, Hash, Hasher}; use ahash::RandomState; -use api::helper::to_column_data_type; use api::v1::value::ValueData; -use api::v1::{ColumnSchema, Row, Rows, SemanticType}; +use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_query::Output; use common_telemetry::{error, info}; use datatypes::data_type::ConcreteDataType; @@ -162,18 +161,16 @@ impl MetricEngineInner { // add table_name column rows.schema.push(ColumnSchema { column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), - datatype: to_column_data_type(&ConcreteDataType::uint32_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Uint32 as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); // add tsid column rows.schema.push(ColumnSchema { column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: to_column_data_type(&ConcreteDataType::uint64_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Uint64 as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); // fill internal columns diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 28098b4e5f5d..e65a4526e690 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -349,16 +349,19 @@ impl MetadataRegion { column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), datatype: ColumnDataType::TimestampMillisecond as _, semantic_type: SemanticType::Timestamp as _, + ..Default::default() }, ColumnSchema { column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), datatype: ColumnDataType::String as _, semantic_type: SemanticType::Tag as _, + ..Default::default() }, ColumnSchema { column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), datatype: ColumnDataType::String as _, semantic_type: SemanticType::Field as _, + ..Default::default() }, ]; let rows = Rows { diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index f71fff63719d..664c35f8367d 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -14,9 +14,8 @@ //! Utilities for testing. -use api::helper::to_column_data_type; use api::v1::value::ValueData; -use api::v1::{ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; +use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::config::MitoConfig; @@ -233,26 +232,23 @@ pub fn row_schema_with_tags(tags: &[&str]) -> Vec { let mut schema = vec![ PbColumnSchema { column_name: "greptime_timestamp".to_string(), - datatype: to_column_data_type(&ConcreteDataType::timestamp_millisecond_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, }, PbColumnSchema { column_name: "greptime_value".to_string(), - datatype: to_column_data_type(&ConcreteDataType::float64_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Float64 as i32, semantic_type: SemanticType::Field as _, + datatype_extension: None, }, ]; for tag in tags { schema.push(PbColumnSchema { column_name: tag.to_string(), - datatype: to_column_data_type(&ConcreteDataType::string_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); } schema diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index c3d4045df334..a7c3d19caeb5 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -220,6 +220,7 @@ async fn test_put_after_alter() { column_name: "tag_1".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }); // Put with new schema. let rows = Rows { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e9aba0bf6f8f..fbf66ac284f1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -155,6 +155,16 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to convert ConcreteDataType to ColumnDataType, reason: {}", + reason + ))] + ConvertColumnDataType { + reason: String, + source: api::error::Error, + location: Location, + }, + /// An error type to indicate that schema is changed and we need /// to fill default values again. #[snafu(display("Need to fill default value for region {}", region_id))] @@ -438,6 +448,7 @@ impl ErrorExt for Error { | InvalidMeta { .. } | InvalidRequest { .. } | FillDefault { .. } + | ConvertColumnDataType { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 37906a0c5c3f..10854d23a8ae 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::{Mutation, OpType, Row, Rows}; +use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows}; use datatypes::value::ValueRef; use store_api::metadata::RegionMetadata; use store_api::storage::SequenceNumber; @@ -45,9 +45,11 @@ impl KeyValues { /// Returns a key value iterator. pub fn iter(&self) -> impl Iterator { let rows = self.mutation.rows.as_ref().unwrap(); + let schema = &rows.schema; rows.rows.iter().enumerate().map(|(idx, row)| { KeyValue { row, + schema, helper: &self.helper, sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. // Safety: This is a valid mutation. @@ -72,6 +74,7 @@ impl KeyValues { #[derive(Debug)] pub struct KeyValue<'a> { row: &'a Row, + schema: &'a Vec, helper: &'a ReadRowHelper, sequence: SequenceNumber, op_type: OpType, @@ -82,21 +85,34 @@ impl<'a> KeyValue<'a> { pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] .iter() - .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) + .map(|idx| { + api::helper::pb_value_to_value_ref( + &self.row.values[*idx], + &self.schema[*idx].datatype_extension, + ) + }) } /// Get field columns. pub fn fields(&self) -> impl Iterator { self.helper.indices[self.helper.num_primary_key_column + 1..] .iter() - .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) + .map(|idx| { + api::helper::pb_value_to_value_ref( + &self.row.values[*idx], + &self.schema[*idx].datatype_extension, + ) + }) } /// Get timestamp. pub fn timestamp(&self) -> ValueRef { // Timestamp is primitive, we clone it. let index = self.helper.indices[self.helper.num_primary_key_column]; - api::helper::pb_value_to_value_ref(&self.row.values[index]) + api::helper::pb_value_to_value_ref( + &self.row.values[index], + &self.schema[index].datatype_extension, + ) } /// Get number of primary key columns. @@ -233,6 +249,7 @@ mod tests { column_name: column_name.to_string(), datatype, semantic_type, + ..Default::default() } }) .collect(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 0289d932cd70..4dd590212c33 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -953,6 +953,7 @@ mod tests { .unwrap() .datatype() as i32, semantic_type: c.semantic_type as i32, + ..Default::default() }) .collect(); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 260e008db248..5ebd5fae110c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::helper::{ - is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, - to_proto_value, + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value, + ColumnDataTypeWrapper, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_query::Output; @@ -40,8 +40,8 @@ use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ - CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, - InvalidRequestSnafu, Result, + CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, + FlushRegionSnafu, InvalidRequestSnafu, Result, }; use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; @@ -152,7 +152,11 @@ impl WriteRequest { if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { // Check data type. ensure!( - is_column_type_value_eq(input_col.datatype, &column.column_schema.data_type), + is_column_type_value_eq( + input_col.datatype, + input_col.datatype_extension.clone(), + &column.column_schema.data_type + ), InvalidRequestSnafu { region_id, reason: format!( @@ -248,19 +252,20 @@ impl WriteRequest { } // Insert column schema. - let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| { - InvalidRequestSnafu { - region_id: self.region_id, - reason: format!( - "no protobuf type for column {} ({:?})", - column.column_schema.name, column.column_schema.data_type - ), - } - })?; + let (datatype, datatype_ext) = + ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone()) + .with_context(|_| ConvertColumnDataTypeSnafu { + reason: format!( + "no protobuf type for column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + })? + .to_parts(); self.rows.schema.push(ColumnSchema { column_name: column.column_schema.name.clone(), datatype: datatype as i32, semantic_type: column.semantic_type as i32, + datatype_extension: datatype_ext, }); Ok(()) @@ -715,6 +720,7 @@ mod tests { column_name: name.to_string(), datatype: data_type as i32, semantic_type: semantic_type as i32, + ..Default::default() } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e602643ff5c5..59738d0e2ad4 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -530,12 +530,15 @@ impl WriteBufferManager for MockWriteBufferManager { } pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) + .unwrap() + .to_parts(); api::v1::ColumnSchema { column_name: metadata.column_schema.name.clone(), - datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) - .unwrap() - .datatype() as i32, + datatype: datatype as i32, semantic_type: metadata.semantic_type as i32, + datatype_extension, } } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 85216c04e7ef..2171ceb7b1bd 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -214,11 +214,13 @@ mod tests { column_name: "tag".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "ts".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, + ..Default::default() }, ]; diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index fb855f62028e..aec2b51566d2 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ - AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn, - DropColumns, RenameTable, SemanticType, + AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, + CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -312,14 +312,14 @@ pub fn column_schemas_to_defs( column_schemas: Vec, primary_keys: &[String], ) -> Result> { - let column_datatypes = column_schemas + let column_datatypes: Vec<(ColumnDataType, Option)> = column_schemas .iter() .map(|c| { ColumnDataTypeWrapper::try_from(c.data_type.clone()) - .map(|w| w.datatype()) + .map(|w| w.to_parts()) .context(ColumnDataTypeSnafu) }) - .collect::>>()?; + .collect::>>()?; column_schemas .iter() @@ -340,7 +340,7 @@ pub fn column_schemas_to_defs( Ok(api::v1::ColumnDef { name: schema.name.clone(), - data_type: datatype as i32, + data_type: datatype.0 as i32, is_nullable: schema.is_nullable(), default_constraint: match schema.default_constraint() { None => vec![], @@ -354,6 +354,7 @@ pub fn column_schemas_to_defs( }, semantic_type, comment, + datatype_extension: datatype.1, }) }) .collect() diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 21bca1c33acd..694906b82833 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -20,7 +20,6 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use common_base::BitVec; -use datatypes::prelude::ConcreteDataType; use datatypes::vectors::VectorRef; use snafu::prelude::*; use snafu::ResultExt; @@ -46,6 +45,7 @@ pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { column_name: column.column_name.clone(), datatype: column.datatype, semantic_type: column.semantic_type, + datatype_extension: column.datatype_extension.clone(), }; schema.push(column_schema); @@ -57,7 +57,7 @@ pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { let null_mask = BitVec::from_vec(column.null_mask); - let column_type = ColumnDataTypeWrapper::try_new(column.datatype) + let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension) .context(ColumnDataTypeSnafu)? .datatype(); let column_values = column.values.unwrap_or_default(); @@ -177,6 +177,7 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { DurationNanosecondValue, duration_nanosecond_values ), + (Decimal128, Decimal128Value, decimal128_values), ); Ok(()) @@ -206,10 +207,16 @@ pub fn column_schema( columns .iter() .map(|(column_name, vector)| { + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(vector.data_type().clone()) + .context(ColumnDataTypeSnafu)? + .to_parts(); + Ok(ColumnSchema { column_name: column_name.clone(), - datatype: data_type(vector.data_type())?.into(), + datatype: datatype as i32, semantic_type: semantic_type(table_info, column_name)?.into(), + datatype_extension, }) }) .collect::>>() @@ -245,11 +252,6 @@ fn semantic_type(table_info: &TableInfo, column: &str) -> Result { Ok(semantic_type) } -fn data_type(data_type: ConcreteDataType) -> Result { - let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?; - Ok(datatype.datatype()) -} - #[cfg(test)] mod tests { use api::v1::column::Values; @@ -270,6 +272,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }, Column { column_name: String::from("col2"), @@ -284,6 +287,7 @@ mod tests { ], ..Default::default() }), + ..Default::default() }, ]; let row_count = 3; @@ -335,6 +339,7 @@ mod tests { i8_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); @@ -349,6 +354,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); @@ -363,6 +369,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index 0b578c8697c3..ba93ad0f4afb 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -156,6 +156,7 @@ mod tests { column_name: "a".to_string(), datatype: ColumnDataType::Int32 as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }], rows: rows .into_iter() diff --git a/src/operator/src/req_convert/insert.rs b/src/operator/src/req_convert/insert.rs index 96f88a739b7b..51984c4de034 100644 --- a/src/operator/src/req_convert/insert.rs +++ b/src/operator/src/req_convert/insert.rs @@ -17,17 +17,15 @@ mod row_to_region; mod stmt_to_region; mod table_to_region; -use api::helper::ColumnDataTypeWrapper; -use api::v1::{ColumnDataType, SemanticType}; +use api::v1::SemanticType; pub use column_to_row::ColumnToRow; -use datatypes::prelude::ConcreteDataType; pub use row_to_region::RowToRegion; use snafu::{OptionExt, ResultExt}; pub use stmt_to_region::StatementToRegion; use table::metadata::TableInfo; pub use table_to_region::TableToRegion; -use crate::error::{ColumnDataTypeSnafu, ColumnNotFoundSnafu, MissingTimeIndexColumnSnafu, Result}; +use crate::error::{ColumnNotFoundSnafu, MissingTimeIndexColumnSnafu, Result}; fn semantic_type(table_info: &TableInfo, column: &str) -> Result { let table_meta = &table_info.meta; @@ -58,8 +56,3 @@ fn semantic_type(table_info: &TableInfo, column: &str) -> Result { Ok(semantic_type) } - -fn data_type(data_type: ConcreteDataType) -> Result { - let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?; - Ok(datatype.datatype()) -} diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index 1230bb1427b3..1297adf6a5de 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::helper::value_to_grpc_value; +use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue}; use catalog::CatalogManager; @@ -25,10 +25,11 @@ use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; use table::TableRef; -use super::{data_type, semantic_type}; +use super::semantic_type; use crate::error::{ - CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu, - InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, TableNotFoundSnafu, + CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, + ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, + TableNotFoundSnafu, }; use crate::req_convert::common::partitioner::Partitioner; @@ -94,13 +95,17 @@ impl<'a> StatementToRegion<'a> { msg: format!("Column {} not found in table {}", column_name, &table_name), })?; - let datatype = data_type(column_schema.data_type.clone())?; + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) + .context(ColumnDataTypeSnafu)? + .to_parts(); let semantic_type = semantic_type(&table_info, column_name)?; let grpc_column_schema = GrpcColumnSchema { column_name: column_name.clone(), datatype: datatype.into(), semantic_type: semantic_type.into(), + datatype_extension, }; schema.push(grpc_column_schema); diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index 5ece06b79f96..729355cf0159 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -156,6 +156,7 @@ mod tests { column_name: "a".to_string(), datatype: ColumnDataType::Int32 as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }], rows: rows .into_iter() diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index c31ee9afae94..7051a9c626a8 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -543,7 +543,8 @@ fn find_partition_entries( for column in column_defs { let column_name = &column.name; let data_type = ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?, + ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone()) + .context(ColumnDataTypeSnafu)?, ); column_name_and_type.push((column_name, data_type)); } diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index f7c64d8ba117..ed0eaa2cc571 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -117,7 +117,11 @@ impl<'a> SplitReadRowHelper<'a> { .iter() .map(|idx| { idx.as_ref().map_or(Value::Null, |idx| { - helper::pb_value_to_value_ref(&row.values[*idx]).into() + helper::pb_value_to_value_ref( + &row.values[*idx], + &self.schema[*idx].datatype_extension, + ) + .into() }) }) .collect() @@ -144,16 +148,19 @@ mod tests { column_name: "id".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "name".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "age".to_string(), datatype: ColumnDataType::Uint32 as i32, semantic_type: SemanticType::Field as i32, + ..Default::default() }, ]; let rows = vec![ diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 6e52f7139d81..6ecbaa4e33ff 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -302,31 +302,37 @@ fn build_insert_column_schemas() -> Vec { column_name: "schema".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "name".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "engine".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "script".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Field.into(), + ..Default::default() }, PbColumnSchema { column_name: "greptime_timestamp".to_string(), datatype: ColumnDataType::TimestampMillisecond.into(), semantic_type: SemanticType::Timestamp.into(), + ..Default::default() }, PbColumnSchema { column_name: "gmt_modified".to_string(), datatype: ColumnDataType::TimestampMillisecond.into(), semantic_type: SemanticType::Field.into(), + ..Default::default() }, ] } @@ -358,7 +364,9 @@ pub fn build_scripts_schema() -> RawSchema { let cs = ColumnSchema::new( c.column_name, // Safety: the type always exists - ColumnDataTypeWrapper::try_new(c.datatype).unwrap().into(), + ColumnDataTypeWrapper::try_new(c.datatype, c.datatype_extension) + .unwrap() + .into(), false, ); if c.semantic_type == SemanticType::Timestamp as i32 { diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 955bda833bcc..a0a4725fa861 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -577,6 +577,7 @@ mod tests { column_name: k.to_string(), datatype: t as i32, semantic_type: s as i32, + ..Default::default() }) .collect() } diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 9dfe92738706..b4daad3ecdac 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -215,6 +215,7 @@ fn write_by_semantic_type( column_name: name.to_string(), datatype: datatype as i32, semantic_type: semantic_type as i32, + ..Default::default() }); one_row.push(value.into()); } else { @@ -269,6 +270,7 @@ pub fn write_ts_precision( column_name: name, datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, + ..Default::default() }); one_row.push(ValueData::TimestampMillisecondValue(ts).into()) } else { diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index eae6551c30e1..e9594aac8e7a 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -107,6 +107,16 @@ fn parse_string_to_value( .fail() } } + ConcreteDataType::Decimal128(_) => { + if let Ok(val) = common_decimal::Decimal128::from_str(&s) { + Ok(Value::Decimal128(val)) + } else { + ParseSqlValueSnafu { + msg: format!("Fail to parse number {s} to Decimal128 value"), + } + .fail() + } + } _ => { unreachable!() } @@ -146,7 +156,19 @@ macro_rules! parse_number_to_value { let n = parse_sql_number::($n)?; Ok(Value::Timestamp(Timestamp::new(n, t.unit()))) }, - // TODO(QuenKar): parse decimal128 string with precision and scale + // TODO(QuenKar): This could need to be optimized + // if this from_str function is slow, + // we can implement parse decimal string with precision and scale manually. + ConcreteDataType::Decimal128(_) => { + if let Ok(val) = common_decimal::Decimal128::from_str($n) { + Ok(Value::Decimal128(val)) + } else { + ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, invalid column type: {:?}", + $n, $data_type) + }.fail() + } + } _ => ParseSqlValueSnafu { msg: format!("Fail to parse number {}, invalid column type: {:?}", @@ -356,18 +378,20 @@ pub fn sql_column_def_to_grpc_column_def(col: &ColumnDef) -> Result for AddColumn { })?; let data_type = column_def.data_type; - let data_type = ColumnDataTypeWrapper::try_new(data_type) + let data_type_ext = column_def.datatype_extension.clone(); + let data_type = ColumnDataTypeWrapper::try_new(data_type, data_type_ext) .map_err(|_| { InvalidRawRegionRequestSnafu { err: format!("unknown raw column datatype: {data_type}"), @@ -313,6 +314,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Tag as _, comment: String::new(), + ..Default::default() }), column_id: 1, }), @@ -329,6 +331,7 @@ mod tests { .unwrap(), semantic_type: SemanticType::Field as _, comment: String::new(), + ..Default::default() }), column_id: 2, }), diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 123e864dd553..ded795861476 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -340,6 +340,7 @@ CREATE TABLE {table_name} ( null_mask: vec![32, 0], semantic_type: SemanticType::Tag as i32, datatype: ColumnDataType::Int32 as i32, + ..Default::default() }, Column { column_name: "b".to_string(), @@ -573,6 +574,7 @@ CREATE TABLE {table_name} ( null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Int32 as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), @@ -611,6 +613,7 @@ CREATE TABLE {table_name} ( null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::String as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), @@ -743,6 +746,7 @@ CREATE TABLE {table_name} ( null_mask: vec![4], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 3d766a5a37f0..a0d19908f503 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -267,6 +267,7 @@ fn expect_data() -> (Column, Column, Column, Column) { null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let expected_mem_col = Column { column_name: "memory".to_string(), @@ -277,6 +278,7 @@ fn expect_data() -> (Column, Column, Column, Column) { null_mask: vec![4], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let expected_ts_col = Column { column_name: "ts".to_string(),