From a12797c0e4290e0b066bf6310183187932c307fe Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:03:37 +0800 Subject: [PATCH 01/12] refactor: ColumnDataTypeWrapper --- benchmarks/src/bin/nyc-taxi.rs | 20 + src/api/src/helper.rs | 491 ++++++++++++------ src/api/src/v1/column_def.rs | 5 +- src/client/examples/logical.rs | 3 + src/common/grpc-expr/src/alter.rs | 3 + src/common/grpc-expr/src/delete.rs | 8 +- src/common/grpc-expr/src/insert.rs | 53 +- src/common/grpc-expr/src/util.rs | 2 + src/common/grpc/src/writer.rs | 48 +- src/common/meta/src/ddl/create_table.rs | 1 + src/meta-srv/src/procedure/tests.rs | 12 +- src/metric-engine/src/engine/put.rs | 13 +- src/metric-engine/src/metadata_region.rs | 3 + src/metric-engine/src/test_util.rs | 18 +- src/mito2/src/engine/alter_test.rs | 1 + src/mito2/src/error.rs | 11 + src/mito2/src/memtable/key_values.rs | 1 + src/mito2/src/memtable/time_series.rs | 4 +- src/mito2/src/request.rs | 47 +- src/mito2/src/test_util.rs | 9 +- src/mito2/src/wal.rs | 2 + src/operator/src/expr_factory.rs | 11 +- src/operator/src/req_convert/common.rs | 26 +- .../src/req_convert/delete/table_to_region.rs | 1 + src/operator/src/req_convert/insert.rs | 11 +- .../src/req_convert/insert/stmt_to_region.rs | 15 +- .../src/req_convert/insert/table_to_region.rs | 1 + src/operator/src/statement/ddl.rs | 3 +- src/partition/src/splitter.rs | 3 + src/script/src/table.rs | 10 +- src/servers/src/prom_store.rs | 1 + src/servers/src/row_writer.rs | 2 + src/store-api/src/metadata.rs | 6 +- src/store-api/src/region_request.rs | 1 + src/store-api/src/storage/requests.rs | 5 +- tests-integration/src/grpc.rs | 4 + tests-integration/tests/grpc.rs | 2 + 37 files changed, 588 insertions(+), 269 deletions(-) diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index b30989625f73..678b5321c00a 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -152,6 +152,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { .unwrap_or_default(), datatype: datatype.into(), semantic_type: semantic_type as i32, + ..Default::default() }; columns.push(column); } @@ -266,6 +267,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tpep_pickup_datetime".to_string(), @@ -274,6 +276,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tpep_dropoff_datetime".to_string(), @@ -282,6 +285,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "passenger_count".to_string(), @@ -290,6 +294,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "trip_distance".to_string(), @@ -298,6 +303,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "RatecodeID".to_string(), @@ -306,6 +312,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "store_and_fwd_flag".to_string(), @@ -314,6 +321,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "PULocationID".to_string(), @@ -322,6 +330,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "DOLocationID".to_string(), @@ -330,6 +339,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "payment_type".to_string(), @@ -338,6 +348,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "fare_amount".to_string(), @@ -346,6 +357,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "extra".to_string(), @@ -354,6 +366,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "mta_tax".to_string(), @@ -362,6 +375,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tip_amount".to_string(), @@ -370,6 +384,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "tolls_amount".to_string(), @@ -378,6 +393,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "improvement_surcharge".to_string(), @@ -386,6 +402,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "total_amount".to_string(), @@ -394,6 +411,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "congestion_surcharge".to_string(), @@ -402,6 +420,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "airport_fee".to_string(), @@ -410,6 +429,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "tpep_pickup_datetime".to_string(), diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 9328540bdf0a..91ffc4582e58 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_base::BitVec; +use common_decimal::Decimal128; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::TimeUnit; @@ -26,47 +27,60 @@ use datatypes::types::{ }; use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, - Float64Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, - IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, - UInt64Vector, VectorRef, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, Float32Vector, Float64Vector, Int32Vector, Int64Vector, + IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, PrimitiveVector, + StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, + TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; +use greptime_proto::v1::column_data_type_extension::Ext; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; -use greptime_proto::v1::{self, DdlRequest, IntervalMonthDayNano, QueryRequest, Row, SemanticType}; +use greptime_proto::v1::{ + self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, + SemanticType, +}; +use paste::paste; use snafu::prelude::*; use crate::error::{self, Result}; use crate::v1::column::Values; use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; - -#[derive(Debug, PartialEq, Eq)] -pub struct ColumnDataTypeWrapper(ColumnDataType); +#[derive(Debug, PartialEq)] +pub struct ColumnDataTypeWrapper { + datatype: ColumnDataType, + datatype_ext: Option, +} impl ColumnDataTypeWrapper { - pub fn try_new(datatype: i32) -> Result { + pub fn try_new(datatype: i32, datatype_ext: Option) -> Result { let datatype = ColumnDataType::try_from(datatype) .context(error::UnknownColumnDataTypeSnafu { datatype })?; - Ok(Self(datatype)) + Ok(Self { + datatype, + datatype_ext, + }) } - pub fn new(datatype: ColumnDataType) -> Self { - Self(datatype) + pub fn new(datatype: ColumnDataType, datatype_ext: Option) -> Self { + Self { + datatype, + datatype_ext, + } } - pub fn datatype(&self) -> ColumnDataType { - self.0 + pub fn datatype(&self) -> (ColumnDataType, Option) { + (self.datatype, self.datatype_ext.clone()) } } impl From for ConcreteDataType { - fn from(datatype: ColumnDataTypeWrapper) -> Self { - match datatype.0 { + fn from(datatype_wrapper: ColumnDataTypeWrapper) -> Self { + match datatype_wrapper.datatype { ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(), ColumnDataType::Int8 => ConcreteDataType::int8_datatype(), ColumnDataType::Int16 => ConcreteDataType::int16_datatype(), @@ -109,6 +123,85 @@ impl From for ConcreteDataType { ConcreteDataType::duration_microsecond_datatype() } ColumnDataType::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), + ColumnDataType::Decimal128 => { + if let Some(Ext::DecimalType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.ext.as_ref()) + { + ConcreteDataType::decimal128_datatype(d.precision as u8, d.scale as i8) + } else { + ConcreteDataType::decimal128_default_datatype() + } + } + } + } +} + +macro_rules! impl_column_type_functions { + ($($Type: ident), +) => { + paste! { + impl ColumnDataTypeWrapper { + $( + pub fn [<$Type:lower _datatype>]() -> ColumnDataTypeWrapper { + ColumnDataTypeWrapper { + datatype: ColumnDataType::$Type, + datatype_ext: None, + } + } + )+ + } + } + } +} + +macro_rules! impl_column_type_functions_with_snake { + ($($TypeName: ident), +) => { + paste!{ + impl ColumnDataTypeWrapper { + $( + pub fn [<$TypeName:snake _datatype>]() -> ColumnDataTypeWrapper { + ColumnDataTypeWrapper { + datatype: ColumnDataType::$TypeName, + datatype_ext: None, + } + } + )+ + } + } + }; +} + +impl_column_type_functions!( + Boolean, Uint8, Uint16, Uint32, Uint64, Int8, Int16, Int32, Int64, Float32, Float64, Binary, + Date, Datetime, String +); + +impl_column_type_functions_with_snake!( + TimestampSecond, + TimestampMillisecond, + TimestampMicrosecond, + TimestampNanosecond, + TimeSecond, + TimeMillisecond, + TimeMicrosecond, + TimeNanosecond, + IntervalYearMonth, + IntervalDayTime, + IntervalMonthDayNano, + DurationSecond, + DurationMillisecond, + DurationMicrosecond, + DurationNanosecond +); + +impl ColumnDataTypeWrapper { + pub fn decimal128_datatype(precision: i32, scale: i32) -> Self { + ColumnDataTypeWrapper { + datatype: ColumnDataType::Decimal128, + datatype_ext: Some(ColumnDataTypeExtension { + ext: Some(Ext::DecimalType(DecimalTypeExtension { precision, scale })), + }), } } } @@ -117,7 +210,7 @@ impl TryFrom for ColumnDataTypeWrapper { type Error = error::Error; fn try_from(datatype: ConcreteDataType) -> Result { - let datatype = ColumnDataTypeWrapper(match datatype { + let column_datatype = match datatype { ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, ConcreteDataType::Int8(_) => ColumnDataType::Int8, ConcreteDataType::Int16(_) => ColumnDataType::Int16, @@ -156,14 +249,30 @@ impl TryFrom for ColumnDataTypeWrapper { DurationType::Microsecond(_) => ColumnDataType::DurationMicrosecond, DurationType::Nanosecond(_) => ColumnDataType::DurationNanosecond, }, + ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, ConcreteDataType::Null(_) | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + | ConcreteDataType::Dictionary(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } - }); - Ok(datatype) + }; + let datatype_extension = match column_datatype { + ColumnDataType::Decimal128 => { + datatype + .as_decimal128() + .map(|decimal_type| ColumnDataTypeExtension { + ext: Some(Ext::DecimalType(DecimalTypeExtension { + precision: decimal_type.precision() as i32, + scale: decimal_type.scale() as i32, + })), + }) + } + _ => None, + }; + Ok(Self { + datatype: column_datatype, + datatype_ext: datatype_extension, + }) } } @@ -289,6 +398,10 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values duration_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::Decimal128 => Values { + decimal128_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } @@ -342,7 +455,8 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { TimeUnit::Microsecond => values.duration_microsecond_values.push(val.value()), TimeUnit::Nanosecond => values.duration_nanosecond_values.push(val.value()), }, - Value::List(_) | Value::Decimal128(_) => unreachable!(), + Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)), + Value::List(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); } @@ -382,16 +496,27 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { } /// Converts an i128 value to google protobuf type [IntervalMonthDayNano]. -pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano { +pub fn convert_i128_to_interval(v: i128) -> v1::IntervalMonthDayNano { let interval = Interval::from_i128(v); let (months, days, nanoseconds) = interval.to_month_day_nano(); - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months, days, nanoseconds, } } +/// Convert common decimal128 to grpc decimal128. +pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 { + let (hi, lo, precision, scale) = v.to_pb_decimal128(); + v1::Decimal128 { + hi, + lo, + precision, + scale, + } +} + pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { let Some(value) = &value.value_data else { return ValueRef::Null; @@ -437,6 +562,12 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::DurationMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), ValueData::DurationMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), + ValueData::Decimal128Value(v) => ValueRef::Decimal128(Decimal128::from_pb_decimal128( + v.hi, + v.lo, + v.precision, + v.scale, + )), } } @@ -523,10 +654,14 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> values.duration_nanosecond_values, )), }, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + // When should I check precision and scale? + ConcreteDataType::Decimal128(_) => Arc::new(Decimal128Vector::from_values( + values + .decimal128_values + .iter() + .map(|x| Decimal128::from_pb_decimal128(x.hi, x.lo, x.precision, x.scale).into()), + )), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } } @@ -696,10 +831,19 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => { + ConcreteDataType::Decimal128(_) => values + .decimal128_values + .into_iter() + .map(|v| { + Value::Decimal128(Decimal128::from_pb_decimal128( + v.hi, + v.lo, + v.precision, + v.scale, + )) + }) + .collect(), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } } @@ -711,12 +855,16 @@ pub fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool } /// Returns true if the pb type value is valid. -pub fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { - let Ok(column_type) = ColumnDataType::try_from(type_value) else { +pub fn is_column_type_value_eq( + type_value: i32, + type_extension: Option, + expect_type: &ConcreteDataType, +) -> bool { + let Ok(column_type_wrapper) = ColumnDataTypeWrapper::try_new(type_value, type_extension) else { return false; }; - is_column_type_eq(column_type, expect_type) + is_column_type_eq(column_type_wrapper, expect_type) } /// Convert value into proto's value. @@ -823,7 +971,18 @@ pub fn to_proto_value(value: Value) -> Option { value_data: Some(ValueData::DurationNanosecondValue(v.value())), }, }, - Value::List(_) | Value::Decimal128(_) => return None, + Value::Decimal128(v) => { + let (hi, lo, precision, scale) = v.to_pb_decimal128(); + v1::Value { + value_data: Some(ValueData::Decimal128Value(v1::Decimal128 { + hi, + lo, + precision, + scale, + })), + } + } + Value::List(_) => return None, }; Some(proto_value) @@ -832,96 +991,59 @@ pub fn to_proto_value(value: Value) -> Option { /// Returns the [ColumnDataType] of the value. /// /// If value is null, returns `None`. -pub fn proto_value_type(value: &v1::Value) -> Option { +pub fn proto_value_type(value: &v1::Value) -> Option { let value_type = match value.value_data.as_ref()? { - ValueData::I8Value(_) => ColumnDataType::Int8, - ValueData::I16Value(_) => ColumnDataType::Int16, - ValueData::I32Value(_) => ColumnDataType::Int32, - ValueData::I64Value(_) => ColumnDataType::Int64, - ValueData::U8Value(_) => ColumnDataType::Uint8, - ValueData::U16Value(_) => ColumnDataType::Uint16, - ValueData::U32Value(_) => ColumnDataType::Uint32, - ValueData::U64Value(_) => ColumnDataType::Uint64, - ValueData::F32Value(_) => ColumnDataType::Float32, - ValueData::F64Value(_) => ColumnDataType::Float64, - ValueData::BoolValue(_) => ColumnDataType::Boolean, - ValueData::BinaryValue(_) => ColumnDataType::Binary, - ValueData::StringValue(_) => ColumnDataType::String, - ValueData::DateValue(_) => ColumnDataType::Date, - ValueData::DatetimeValue(_) => ColumnDataType::Datetime, - ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond, - ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond, - ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, - ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond, - ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond, - ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, - ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, - ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, - ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth, - ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime, - ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano, - ValueData::DurationSecondValue(_) => ColumnDataType::DurationSecond, - ValueData::DurationMillisecondValue(_) => ColumnDataType::DurationMillisecond, - ValueData::DurationMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, - ValueData::DurationNanosecondValue(_) => ColumnDataType::DurationNanosecond, - }; - Some(value_type) -} - -/// Convert [ConcreteDataType] to [ColumnDataType]. -pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option { - let column_data_type = match data_type { - ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, - ConcreteDataType::Int8(_) => ColumnDataType::Int8, - ConcreteDataType::Int16(_) => ColumnDataType::Int16, - ConcreteDataType::Int32(_) => ColumnDataType::Int32, - ConcreteDataType::Int64(_) => ColumnDataType::Int64, - ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, - ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, - ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, - ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, - ConcreteDataType::Float32(_) => ColumnDataType::Float32, - ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) => ColumnDataType::Binary, - ConcreteDataType::String(_) => ColumnDataType::String, - ConcreteDataType::Date(_) => ColumnDataType::Date, - ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, - ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { - ColumnDataType::TimestampMillisecond + ValueData::I8Value(_) => ColumnDataTypeWrapper::int8_datatype(), + ValueData::I16Value(_) => ColumnDataTypeWrapper::int16_datatype(), + ValueData::I32Value(_) => ColumnDataTypeWrapper::int32_datatype(), + ValueData::I64Value(_) => ColumnDataTypeWrapper::int64_datatype(), + ValueData::U8Value(_) => ColumnDataTypeWrapper::uint8_datatype(), + ValueData::U16Value(_) => ColumnDataTypeWrapper::uint16_datatype(), + ValueData::U32Value(_) => ColumnDataTypeWrapper::uint32_datatype(), + ValueData::U64Value(_) => ColumnDataTypeWrapper::uint64_datatype(), + ValueData::F32Value(_) => ColumnDataTypeWrapper::float32_datatype(), + ValueData::F64Value(_) => ColumnDataTypeWrapper::float64_datatype(), + ValueData::BoolValue(_) => ColumnDataTypeWrapper::boolean_datatype(), + ValueData::BinaryValue(_) => ColumnDataTypeWrapper::binary_datatype(), + ValueData::StringValue(_) => ColumnDataTypeWrapper::string_datatype(), + ValueData::DateValue(_) => ColumnDataTypeWrapper::date_datatype(), + ValueData::DatetimeValue(_) => ColumnDataTypeWrapper::datetime_datatype(), + ValueData::TimestampSecondValue(_) => ColumnDataTypeWrapper::timestamp_second_datatype(), + ValueData::TimestampMillisecondValue(_) => { + ColumnDataTypeWrapper::timestamp_millisecond_datatype() + } + ValueData::TimestampMicrosecondValue(_) => { + ColumnDataTypeWrapper::timestamp_microsecond_datatype() } - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { - ColumnDataType::TimestampMicrosecond + ValueData::TimestampNanosecondValue(_) => { + ColumnDataTypeWrapper::timestamp_nanosecond_datatype() } - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { - ColumnDataType::TimestampNanosecond + ValueData::TimeSecondValue(_) => ColumnDataTypeWrapper::time_second_datatype(), + ValueData::TimeMillisecondValue(_) => ColumnDataTypeWrapper::time_millisecond_datatype(), + ValueData::TimeMicrosecondValue(_) => ColumnDataTypeWrapper::time_microsecond_datatype(), + ValueData::TimeNanosecondValue(_) => ColumnDataTypeWrapper::time_nanosecond_datatype(), + ValueData::IntervalYearMonthValues(_) => { + ColumnDataTypeWrapper::interval_year_month_datatype() } - ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, - ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, - ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, - ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, - ConcreteDataType::Duration(DurationType::Second(_)) => ColumnDataType::DurationSecond, - ConcreteDataType::Duration(DurationType::Millisecond(_)) => { - ColumnDataType::DurationMillisecond + ValueData::IntervalDayTimeValues(_) => ColumnDataTypeWrapper::interval_day_time_datatype(), + ValueData::IntervalMonthDayNanoValues(_) => { + ColumnDataTypeWrapper::interval_month_day_nano_datatype() } - ConcreteDataType::Duration(DurationType::Microsecond(_)) => { - ColumnDataType::DurationMicrosecond + ValueData::DurationSecondValue(_) => ColumnDataTypeWrapper::duration_second_datatype(), + ValueData::DurationMillisecondValue(_) => { + ColumnDataTypeWrapper::duration_millisecond_datatype() } - ConcreteDataType::Duration(DurationType::Nanosecond(_)) => { - ColumnDataType::DurationNanosecond + ValueData::DurationMicrosecondValue(_) => { + ColumnDataTypeWrapper::duration_microsecond_datatype() } - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => ColumnDataType::IntervalYearMonth, - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { - ColumnDataType::IntervalMonthDayNano + ValueData::DurationNanosecondValue(_) => { + ColumnDataTypeWrapper::duration_nanosecond_datatype() + } + ValueData::Decimal128Value(v) => { + ColumnDataTypeWrapper::decimal128_datatype(v.precision, v.scale) } - ConcreteDataType::Interval(IntervalType::DayTime(_)) => ColumnDataType::IntervalDayTime, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Decimal128(_) => return None, }; - - Some(column_data_type) + Some(value_type) } pub fn vectors_to_rows<'a>( @@ -982,14 +1104,23 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Microsecond => ValueData::DurationMicrosecondValue(v.value()), TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), - Value::List(_) | Value::Decimal128(_) => unreachable!(), + Value::Decimal128(v) => { + let (hi, lo, precision, scale) = v.to_pb_decimal128(); + Some(ValueData::Decimal128Value(v1::Decimal128 { + hi, + lo, + precision, + scale, + })) + } + Value::List(_) => unreachable!(), }, } } /// Returns true if the column type is equal to expected type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - if let Some(expect) = to_column_data_type(expect_type) { +fn is_column_type_eq(column_type: ColumnDataTypeWrapper, expect_type: &ConcreteDataType) -> bool { + if let Ok(expect) = ColumnDataTypeWrapper::try_from(expect_type.clone()) { column_type == expect } else { false @@ -1095,178 +1226,178 @@ mod tests { fn test_concrete_datatype_from_column_datatype() { assert_eq!( ConcreteDataType::boolean_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Boolean).into() + ColumnDataTypeWrapper::boolean_datatype().into() ); assert_eq!( ConcreteDataType::int8_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int8).into() + ColumnDataTypeWrapper::int8_datatype().into() ); assert_eq!( ConcreteDataType::int16_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int16).into() + ColumnDataTypeWrapper::int16_datatype().into() ); assert_eq!( ConcreteDataType::int32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int32).into() + ColumnDataTypeWrapper::int32_datatype().into() ); assert_eq!( ConcreteDataType::int64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Int64).into() + ColumnDataTypeWrapper::int64_datatype().into() ); assert_eq!( ConcreteDataType::uint8_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint8).into() + ColumnDataTypeWrapper::uint8_datatype().into() ); assert_eq!( ConcreteDataType::uint16_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint16).into() + ColumnDataTypeWrapper::uint16_datatype().into() ); assert_eq!( ConcreteDataType::uint32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint32).into() + ColumnDataTypeWrapper::uint32_datatype().into() ); assert_eq!( ConcreteDataType::uint64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Uint64).into() + ColumnDataTypeWrapper::uint64_datatype().into() ); assert_eq!( ConcreteDataType::float32_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Float32).into() + ColumnDataTypeWrapper::float32_datatype().into() ); assert_eq!( ConcreteDataType::float64_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Float64).into() + ColumnDataTypeWrapper::float64_datatype().into() ); assert_eq!( ConcreteDataType::binary_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Binary).into() + ColumnDataTypeWrapper::binary_datatype().into() ); assert_eq!( ConcreteDataType::string_datatype(), - ColumnDataTypeWrapper(ColumnDataType::String).into() + ColumnDataTypeWrapper::string_datatype().into() ); assert_eq!( ConcreteDataType::date_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Date).into() + ColumnDataTypeWrapper::date_datatype().into() ); assert_eq!( ConcreteDataType::datetime_datatype(), - ColumnDataTypeWrapper(ColumnDataType::Datetime).into() + ColumnDataTypeWrapper::datetime_datatype().into() ); assert_eq!( ConcreteDataType::timestamp_millisecond_datatype(), - ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into() + ColumnDataTypeWrapper::timestamp_millisecond_datatype().into() ); assert_eq!( ConcreteDataType::time_datatype(TimeUnit::Millisecond), - ColumnDataTypeWrapper(ColumnDataType::TimeMillisecond).into() + ColumnDataTypeWrapper::time_millisecond_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::DayTime), - ColumnDataTypeWrapper(ColumnDataType::IntervalDayTime).into() + ColumnDataTypeWrapper::interval_day_time_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::YearMonth), - ColumnDataTypeWrapper(ColumnDataType::IntervalYearMonth).into() + ColumnDataTypeWrapper::interval_year_month_datatype().into() ); assert_eq!( ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano), - ColumnDataTypeWrapper(ColumnDataType::IntervalMonthDayNano).into() + ColumnDataTypeWrapper::interval_month_day_nano_datatype().into() ); assert_eq!( ConcreteDataType::duration_millisecond_datatype(), - ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond).into() + ColumnDataTypeWrapper::duration_millisecond_datatype().into() ) } #[test] fn test_column_datatype_from_concrete_datatype() { assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Boolean), + ColumnDataTypeWrapper::boolean_datatype(), ConcreteDataType::boolean_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int8), + ColumnDataTypeWrapper::int8_datatype(), ConcreteDataType::int8_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int16), + ColumnDataTypeWrapper::int16_datatype(), ConcreteDataType::int16_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int32), + ColumnDataTypeWrapper::int32_datatype(), ConcreteDataType::int32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Int64), + ColumnDataTypeWrapper::int64_datatype(), ConcreteDataType::int64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint8), + ColumnDataTypeWrapper::uint8_datatype(), ConcreteDataType::uint8_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint16), + ColumnDataTypeWrapper::uint16_datatype(), ConcreteDataType::uint16_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint32), + ColumnDataTypeWrapper::uint32_datatype(), ConcreteDataType::uint32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Uint64), + ColumnDataTypeWrapper::uint64_datatype(), ConcreteDataType::uint64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Float32), + ColumnDataTypeWrapper::float32_datatype(), ConcreteDataType::float32_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Float64), + ColumnDataTypeWrapper::float64_datatype(), ConcreteDataType::float64_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Binary), + ColumnDataTypeWrapper::binary_datatype(), ConcreteDataType::binary_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::String), + ColumnDataTypeWrapper::string_datatype(), ConcreteDataType::string_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Date), + ColumnDataTypeWrapper::date_datatype(), ConcreteDataType::date_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::Datetime), + ColumnDataTypeWrapper::datetime_datatype(), ConcreteDataType::datetime_datatype().try_into().unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond), + ColumnDataTypeWrapper::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_millisecond_datatype() .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalYearMonth), + ColumnDataTypeWrapper::interval_year_month_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::YearMonth) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalDayTime), + ColumnDataTypeWrapper::interval_day_time_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::DayTime) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::IntervalMonthDayNano), + ColumnDataTypeWrapper::interval_month_day_nano_datatype(), ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano) .try_into() .unwrap() ); assert_eq!( - ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond), + ColumnDataTypeWrapper::duration_millisecond_datatype(), ConcreteDataType::duration_millisecond_datatype() .try_into() .unwrap() @@ -1298,6 +1429,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1339,6 +1471,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1380,6 +1513,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(IntervalYearMonthVector::from_vec(vec![1, 2, 3])); @@ -1424,6 +1558,7 @@ mod tests { }), null_mask: vec![], datatype: 0, + ..Default::default() }; let vector = Arc::new(DurationNanosecondVector::from_vec(vec![1, 2, 3])); @@ -1468,6 +1603,7 @@ mod tests { }), null_mask: vec![2], datatype: ColumnDataType::Boolean as i32, + ..Default::default() }; let row_count = 4; @@ -1625,17 +1761,17 @@ mod tests { &ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)), Values { interval_month_day_nano_values: vec![ - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 1, days: 2, nanoseconds: 3, }, - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 5, days: 6, nanoseconds: 7, }, - IntervalMonthDayNano { + v1::IntervalMonthDayNano { months: 9, days: 10, nanoseconds: 11, @@ -1867,4 +2003,25 @@ mod tests { assert_eq!(values[6], ValueData::DateValue(30)); assert_eq!(values[7], ValueData::StringValue("c".to_string())); } + + #[test] + fn test_is_column_type_value_eq() { + // test column type eq + let column1 = Column { + column_name: "test".to_string(), + semantic_type: 0, + values: Some(Values { + bool_values: vec![false, true, true], + ..Default::default() + }), + null_mask: vec![2], + datatype: ColumnDataType::Boolean as i32, + datatype_extension: None, + }; + assert!(is_column_type_value_eq( + column1.datatype, + column1.datatype_extension, + &ConcreteDataType::boolean_datatype(), + )); + } } diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 7a812d005fae..4a077d3b5451 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -22,7 +22,10 @@ use crate::helper::ColumnDataTypeWrapper; use crate::v1::ColumnDef; pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { - let data_type = ColumnDataTypeWrapper::try_new(column_def.data_type)?; + let data_type = ColumnDataTypeWrapper::try_new( + column_def.data_type, + column_def.datatype_extension.clone(), + )?; let constraint = if column_def.default_constraint.is_empty() { None diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index ca7d3ae96ba3..ec4c11cdd9f9 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -46,6 +46,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "key".to_string(), @@ -54,6 +55,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, ColumnDef { name: "value".to_string(), @@ -62,6 +64,7 @@ async fn run() { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "timestamp".to_string(), diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 462eefde1a27..532f9cf15c48 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -158,6 +158,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: None, }], @@ -199,6 +200,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: Some(Location { location_type: LocationType::First.into(), @@ -213,6 +215,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), location: Some(Location { location_type: LocationType::After.into(), diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs index d272b6aa0bc0..ff737fcdfc60 100644 --- a/src/common/grpc-expr/src/delete.rs +++ b/src/common/grpc-expr/src/delete.rs @@ -36,14 +36,16 @@ pub fn to_table_delete_request( values, null_mask, datatype, + datatype_extension, .. } in request.key_columns { let Some(values) = values else { continue }; - let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) - .context(ColumnDataTypeSnafu)? - .into(); + let datatype: ConcreteDataType = + ColumnDataTypeWrapper::try_new(datatype, datatype_extension) + .context(ColumnDataTypeSnafu)? + .into(); let vector = add_values_to_builder(datatype, values, row_count, null_mask)?; ensure!( diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 5b173b6fdc67..746189ee2b53 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -119,7 +119,7 @@ mod tests { nullable: bool, ) -> error::Result { let datatype_wrapper = - ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?; + ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?; Ok(ColumnSchema::new( column_name, @@ -170,7 +170,8 @@ mod tests { .iter() .find(|c| c.name == "host") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -184,7 +185,8 @@ mod tests { .iter() .find(|c| c.name == "cpu") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -198,7 +200,8 @@ mod tests { .iter() .find(|c| c.name == "memory") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -212,7 +215,8 @@ mod tests { .iter() .find(|c| c.name == "time") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -226,7 +230,8 @@ mod tests { .iter() .find(|c| c.name == "interval") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -240,7 +245,8 @@ mod tests { .iter() .find(|c| c.name == "duration") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -254,7 +260,8 @@ mod tests { .iter() .find(|c| c.name == "ts") .unwrap() - .data_type + .data_type, + None ) .unwrap() ) @@ -284,8 +291,11 @@ mod tests { assert_eq!( ConcreteDataType::string_datatype(), ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(host_column.column_def.as_ref().unwrap().data_type) - .unwrap() + ColumnDataTypeWrapper::try_new( + host_column.column_def.as_ref().unwrap().data_type, + None + ) + .unwrap() ) ); @@ -294,7 +304,8 @@ mod tests { ConcreteDataType::float64_datatype(), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - memory_column.column_def.as_ref().unwrap().data_type + memory_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -304,8 +315,11 @@ mod tests { assert_eq!( ConcreteDataType::time_datatype(TimeUnit::Millisecond), ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(time_column.column_def.as_ref().unwrap().data_type) - .unwrap() + ColumnDataTypeWrapper::try_new( + time_column.column_def.as_ref().unwrap().data_type, + None + ) + .unwrap() ) ); @@ -314,7 +328,8 @@ mod tests { ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - interval_column.column_def.as_ref().unwrap().data_type + interval_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -326,7 +341,8 @@ mod tests { ConcreteDataType::duration_millisecond_datatype(), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - duration_column.column_def.as_ref().unwrap().data_type + duration_column.column_def.as_ref().unwrap().data_type, + None ) .unwrap() ) @@ -360,6 +376,7 @@ mod tests { values: Some(host_vals), null_mask: vec![0], datatype: ColumnDataType::String as i32, + ..Default::default() }; let cpu_vals = Values { @@ -372,6 +389,7 @@ mod tests { values: Some(cpu_vals), null_mask: vec![2], datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let mem_vals = Values { @@ -384,6 +402,7 @@ mod tests { values: Some(mem_vals), null_mask: vec![1], datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let time_vals = Values { @@ -396,6 +415,7 @@ mod tests { values: Some(time_vals), null_mask: vec![0], datatype: ColumnDataType::TimeMillisecond as i32, + ..Default::default() }; let interval1 = IntervalMonthDayNano { @@ -418,6 +438,7 @@ mod tests { values: Some(interval_vals), null_mask: vec![0], datatype: ColumnDataType::IntervalMonthDayNano as i32, + ..Default::default() }; let duration_vals = Values { @@ -430,6 +451,7 @@ mod tests { values: Some(duration_vals), null_mask: vec![0], datatype: ColumnDataType::DurationMillisecond as i32, + ..Default::default() }; let ts_vals = Values { @@ -442,6 +464,7 @@ mod tests { values: Some(ts_vals), null_mask: vec![0], datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() }; ( diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 337a0a16e6e2..f7068bc875d8 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -121,6 +121,7 @@ pub fn build_create_table_expr( default_constraint: vec![], semantic_type, comment: String::new(), + ..Default::default() }; column_defs.push(column_def); } @@ -161,6 +162,7 @@ pub fn extract_new_columns( default_constraint: vec![], semantic_type: expr.semantic_type, comment: String::new(), + ..Default::default() }); AddColumn { column_def, diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 7efa3dfd5cad..3a8e9238287e 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use api::helper::values_with_capacity; -use api::v1::{Column, ColumnDataType, SemanticType}; +use api::v1::{Column, ColumnDataType, ColumnDataTypeExtension, SemanticType}; use common_base::BitVec; use common_time::timestamp::TimeUnit; use snafu::ensure; @@ -50,6 +50,7 @@ impl LinesWriter { column_name, ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, + None, ); ensure!( column.datatype == ColumnDataType::TimestampMillisecond as i32, @@ -69,7 +70,8 @@ impl LinesWriter { } pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag); + let (idx, column) = + self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag, None); ensure!( column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { @@ -86,8 +88,12 @@ impl LinesWriter { } pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Uint64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Uint64 as i32, TypeMismatchSnafu { @@ -104,8 +110,12 @@ impl LinesWriter { } pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Int64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Int64 as i32, TypeMismatchSnafu { @@ -122,8 +132,12 @@ impl LinesWriter { } pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Float64, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Float64 as i32, TypeMismatchSnafu { @@ -140,8 +154,12 @@ impl LinesWriter { } pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::String, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::String, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { @@ -158,8 +176,12 @@ impl LinesWriter { } pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field); + let (idx, column) = self.mut_column( + column_name, + ColumnDataType::Boolean, + SemanticType::Field, + None, + ); ensure!( column.datatype == ColumnDataType::Boolean as i32, TypeMismatchSnafu { @@ -201,6 +223,7 @@ impl LinesWriter { column_name: &str, datatype: ColumnDataType, semantic_type: SemanticType, + datatype_extension: Option, ) -> (usize, &mut Column) { let column_names = &mut self.column_name_index; let column_idx = match column_names.get(column_name) { @@ -218,6 +241,7 @@ impl LinesWriter { values: Some(values_with_capacity(datatype, to_insert)), datatype: datatype as i32, null_mask: Vec::default(), + datatype_extension, }); let _ = column_names.insert(column_name.to_string(), new_idx); new_idx diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 54632e9b3f05..85b4bb8d23da 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -132,6 +132,7 @@ impl CreateTableProcedure { default_constraint: c.default_constraint.clone(), semantic_type: semantic_type as i32, comment: String::new(), + datatype_extension: c.datatype_extension.clone(), }), column_id: i as u32, } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index d9ca7a7d31bc..49d09a2ad3fc 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -54,6 +54,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_tag1".to_string(), @@ -62,6 +63,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_tag2".to_string(), @@ -70,6 +72,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }, PbColumnDef { name: "my_field_column".to_string(), @@ -78,6 +81,7 @@ fn create_table_task() -> CreateTableTask { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }, ], time_index: "ts".to_string(), @@ -114,6 +118,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), + ..Default::default() }), column_id: 0, }, @@ -125,6 +130,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), column_id: 1, }, @@ -136,6 +142,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), column_id: 2, }, @@ -147,6 +154,7 @@ fn test_create_region_request_template() { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), column_id: 3, }, @@ -287,6 +295,7 @@ fn test_create_alter_region_request() { default_constraint: b"hello".to_vec(), semantic_type: SemanticType::Tag as i32, comment: String::new(), + ..Default::default() }), location: Some(AddColumnLocation { location_type: LocationType::After as i32, @@ -321,7 +330,8 @@ fn test_create_alter_region_request() { is_nullable: true, default_constraint: b"hello".to_vec(), semantic_type: SemanticType::Tag as i32, - comment: String::new() + comment: String::new(), + ..Default::default() }), column_id: 3, }), diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index dd1347e705b2..e92aabffab46 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -15,9 +15,8 @@ use std::hash::{BuildHasher, Hash, Hasher}; use ahash::RandomState; -use api::helper::to_column_data_type; use api::v1::value::ValueData; -use api::v1::{ColumnSchema, Row, Rows, SemanticType}; +use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_query::Output; use common_telemetry::{error, info}; use datatypes::data_type::ConcreteDataType; @@ -162,18 +161,16 @@ impl MetricEngineInner { // add table_name column rows.schema.push(ColumnSchema { column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), - datatype: to_column_data_type(&ConcreteDataType::uint32_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Uint32 as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); // add tsid column rows.schema.push(ColumnSchema { column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: to_column_data_type(&ConcreteDataType::uint64_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Uint64 as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); // fill internal columns diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 5cc2880cde15..3d475a0c0a61 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -276,16 +276,19 @@ impl MetadataRegion { column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), datatype: ColumnDataType::TimestampMillisecond as _, semantic_type: SemanticType::Timestamp as _, + ..Default::default() }, ColumnSchema { column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), datatype: ColumnDataType::String as _, semantic_type: SemanticType::Tag as _, + ..Default::default() }, ColumnSchema { column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), datatype: ColumnDataType::String as _, semantic_type: SemanticType::Field as _, + ..Default::default() }, ]; let rows = Rows { diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index f9d6e94ac929..690874e316d3 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -14,9 +14,8 @@ //! Utilities for testing. -use api::helper::to_column_data_type; use api::v1::value::ValueData; -use api::v1::{ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; +use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::config::MitoConfig; @@ -216,26 +215,23 @@ pub fn row_schema_with_tags(tags: &[&str]) -> Vec { let mut schema = vec![ PbColumnSchema { column_name: "greptime_timestamp".to_string(), - datatype: to_column_data_type(&ConcreteDataType::timestamp_millisecond_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, }, PbColumnSchema { column_name: "greptime_value".to_string(), - datatype: to_column_data_type(&ConcreteDataType::float64_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::Float64 as i32, semantic_type: SemanticType::Field as _, + datatype_extension: None, }, ]; for tag in tags { schema.push(PbColumnSchema { column_name: tag.to_string(), - datatype: to_column_data_type(&ConcreteDataType::string_datatype()) - .unwrap() - .into(), + datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as _, + datatype_extension: None, }); } schema diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index c3d4045df334..a7c3d19caeb5 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -220,6 +220,7 @@ async fn test_put_after_alter() { column_name: "tag_1".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }); // Put with new schema. let rows = Rows { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e9aba0bf6f8f..fbf66ac284f1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -155,6 +155,16 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to convert ConcreteDataType to ColumnDataType, reason: {}", + reason + ))] + ConvertColumnDataType { + reason: String, + source: api::error::Error, + location: Location, + }, + /// An error type to indicate that schema is changed and we need /// to fill default values again. #[snafu(display("Need to fill default value for region {}", region_id))] @@ -438,6 +448,7 @@ impl ErrorExt for Error { | InvalidMeta { .. } | InvalidRequest { .. } | FillDefault { .. } + | ConvertColumnDataType { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 37906a0c5c3f..ce29a89ee051 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -233,6 +233,7 @@ mod tests { column_name: column_name.to_string(), datatype, semantic_type, + ..Default::default() } }) .collect(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 0289d932cd70..90ccdc6540c8 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -951,8 +951,10 @@ mod tests { column_name: c.column_schema.name.clone(), datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) .unwrap() - .datatype() as i32, + .datatype() + .0 as i32, semantic_type: c.semantic_type as i32, + ..Default::default() }) .collect(); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 260e008db248..14cb5281c174 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::helper::{ - is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, - to_proto_value, + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value, + ColumnDataTypeWrapper, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_query::Output; @@ -40,8 +40,8 @@ use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ - CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, - InvalidRequestSnafu, Result, + CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, + FlushRegionSnafu, InvalidRequestSnafu, Result, }; use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; @@ -152,7 +152,11 @@ impl WriteRequest { if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { // Check data type. ensure!( - is_column_type_value_eq(input_col.datatype, &column.column_schema.data_type), + is_column_type_value_eq( + input_col.datatype, + input_col.datatype_extension.clone(), + &column.column_schema.data_type + ), InvalidRequestSnafu { region_id, reason: format!( @@ -248,19 +252,20 @@ impl WriteRequest { } // Insert column schema. - let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| { - InvalidRequestSnafu { - region_id: self.region_id, - reason: format!( - "no protobuf type for column {} ({:?})", - column.column_schema.name, column.column_schema.data_type - ), - } - })?; + let (datatype, datatype_ext) = + ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone()) + .context(ConvertColumnDataTypeSnafu { + reason: format!( + "no protobuf type for column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + })? + .datatype(); self.rows.schema.push(ColumnSchema { column_name: column.column_schema.name.clone(), datatype: datatype as i32, semantic_type: column.semantic_type as i32, + datatype_extension: datatype_ext, }); Ok(()) @@ -353,7 +358,8 @@ pub(crate) fn validate_proto_value( value: &Value, column_schema: &ColumnSchema, ) -> Result<()> { - if let Some(value_type) = proto_value_type(value) { + if let Some(column_type) = proto_value_type(value) { + let (value_type, value_type_ext) = column_type.datatype(); ensure!( value_type as i32 == column_schema.datatype, InvalidRequestSnafu { @@ -367,6 +373,16 @@ pub(crate) fn validate_proto_value( ), } ); + ensure!( + value_type_ext == column_schema.datatype_extension, + InvalidRequestSnafu { + region_id, + reason: format!( + "value has type extension {:?}, but column {} has type extension {:?}", + value_type_ext, column_schema.column_name, column_schema.datatype_extension, + ), + } + ); } Ok(()) @@ -715,6 +731,7 @@ mod tests { column_name: name.to_string(), datatype: data_type as i32, semantic_type: semantic_type as i32, + ..Default::default() } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e602643ff5c5..e5f24788375f 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -530,12 +530,15 @@ impl WriteBufferManager for MockWriteBufferManager { } pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) + .unwrap() + .datatype(); api::v1::ColumnSchema { column_name: metadata.column_schema.name.clone(), - datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) - .unwrap() - .datatype() as i32, + datatype: datatype as i32, semantic_type: metadata.semantic_type as i32, + datatype_extension, } } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 85216c04e7ef..2171ceb7b1bd 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -214,11 +214,13 @@ mod tests { column_name: "tag".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "ts".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, + ..Default::default() }, ]; diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index fb855f62028e..b4ddfd0cf508 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ - AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn, - DropColumns, RenameTable, SemanticType, + AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, + CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -312,14 +312,14 @@ pub fn column_schemas_to_defs( column_schemas: Vec, primary_keys: &[String], ) -> Result> { - let column_datatypes = column_schemas + let column_datatypes: Vec<(ColumnDataType, Option)> = column_schemas .iter() .map(|c| { ColumnDataTypeWrapper::try_from(c.data_type.clone()) .map(|w| w.datatype()) .context(ColumnDataTypeSnafu) }) - .collect::>>()?; + .collect::>>()?; column_schemas .iter() @@ -340,7 +340,7 @@ pub fn column_schemas_to_defs( Ok(api::v1::ColumnDef { name: schema.name.clone(), - data_type: datatype as i32, + data_type: datatype.0 as i32, is_nullable: schema.is_nullable(), default_constraint: match schema.default_constraint() { None => vec![], @@ -354,6 +354,7 @@ pub fn column_schemas_to_defs( }, semantic_type, comment, + datatype_extension: datatype.1, }) }) .collect() diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 21bca1c33acd..937038d030f1 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -20,7 +20,6 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use common_base::BitVec; -use datatypes::prelude::ConcreteDataType; use datatypes::vectors::VectorRef; use snafu::prelude::*; use snafu::ResultExt; @@ -46,6 +45,7 @@ pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { column_name: column.column_name.clone(), datatype: column.datatype, semantic_type: column.semantic_type, + datatype_extension: column.datatype_extension.clone(), }; schema.push(column_schema); @@ -57,9 +57,10 @@ pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { let null_mask = BitVec::from_vec(column.null_mask); - let column_type = ColumnDataTypeWrapper::try_new(column.datatype) + let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension) .context(ColumnDataTypeSnafu)? - .datatype(); + .datatype() + .0; let column_values = column.values.unwrap_or_default(); macro_rules! push_column_values_match_types { @@ -177,6 +178,7 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { DurationNanosecondValue, duration_nanosecond_values ), + (Decimal128, Decimal128Value, decimal128_values), ); Ok(()) @@ -206,10 +208,16 @@ pub fn column_schema( columns .iter() .map(|(column_name, vector)| { + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(vector.data_type().clone()) + .context(ColumnDataTypeSnafu)? + .datatype(); + Ok(ColumnSchema { column_name: column_name.clone(), - datatype: data_type(vector.data_type())?.into(), + datatype: datatype as i32, semantic_type: semantic_type(table_info, column_name)?.into(), + datatype_extension, }) }) .collect::>>() @@ -245,11 +253,6 @@ fn semantic_type(table_info: &TableInfo, column: &str) -> Result { Ok(semantic_type) } -fn data_type(data_type: ConcreteDataType) -> Result { - let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?; - Ok(datatype.datatype()) -} - #[cfg(test)] mod tests { use api::v1::column::Values; @@ -270,6 +273,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }, Column { column_name: String::from("col2"), @@ -284,6 +288,7 @@ mod tests { ], ..Default::default() }), + ..Default::default() }, ]; let row_count = 3; @@ -335,6 +340,7 @@ mod tests { i8_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); @@ -349,6 +355,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); @@ -363,6 +370,7 @@ mod tests { i32_values: vec![42], ..Default::default() }), + ..Default::default() }]; let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index 0b578c8697c3..ba93ad0f4afb 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -156,6 +156,7 @@ mod tests { column_name: "a".to_string(), datatype: ColumnDataType::Int32 as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }], rows: rows .into_iter() diff --git a/src/operator/src/req_convert/insert.rs b/src/operator/src/req_convert/insert.rs index 96f88a739b7b..51984c4de034 100644 --- a/src/operator/src/req_convert/insert.rs +++ b/src/operator/src/req_convert/insert.rs @@ -17,17 +17,15 @@ mod row_to_region; mod stmt_to_region; mod table_to_region; -use api::helper::ColumnDataTypeWrapper; -use api::v1::{ColumnDataType, SemanticType}; +use api::v1::SemanticType; pub use column_to_row::ColumnToRow; -use datatypes::prelude::ConcreteDataType; pub use row_to_region::RowToRegion; use snafu::{OptionExt, ResultExt}; pub use stmt_to_region::StatementToRegion; use table::metadata::TableInfo; pub use table_to_region::TableToRegion; -use crate::error::{ColumnDataTypeSnafu, ColumnNotFoundSnafu, MissingTimeIndexColumnSnafu, Result}; +use crate::error::{ColumnNotFoundSnafu, MissingTimeIndexColumnSnafu, Result}; fn semantic_type(table_info: &TableInfo, column: &str) -> Result { let table_meta = &table_info.meta; @@ -58,8 +56,3 @@ fn semantic_type(table_info: &TableInfo, column: &str) -> Result { Ok(semantic_type) } - -fn data_type(data_type: ConcreteDataType) -> Result { - let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?; - Ok(datatype.datatype()) -} diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index 1230bb1427b3..dcdab7881d8c 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::helper::value_to_grpc_value; +use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue}; use catalog::CatalogManager; @@ -25,10 +25,11 @@ use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; use table::TableRef; -use super::{data_type, semantic_type}; +use super::semantic_type; use crate::error::{ - CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu, - InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, TableNotFoundSnafu, + CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, + ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, + TableNotFoundSnafu, }; use crate::req_convert::common::partitioner::Partitioner; @@ -94,13 +95,17 @@ impl<'a> StatementToRegion<'a> { msg: format!("Column {} not found in table {}", column_name, &table_name), })?; - let datatype = data_type(column_schema.data_type.clone())?; + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) + .context(ColumnDataTypeSnafu)? + .datatype(); let semantic_type = semantic_type(&table_info, column_name)?; let grpc_column_schema = GrpcColumnSchema { column_name: column_name.clone(), datatype: datatype.into(), semantic_type: semantic_type.into(), + datatype_extension, }; schema.push(grpc_column_schema); diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index 5ece06b79f96..729355cf0159 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -156,6 +156,7 @@ mod tests { column_name: "a".to_string(), datatype: ColumnDataType::Int32 as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }], rows: rows .into_iter() diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index c31ee9afae94..7051a9c626a8 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -543,7 +543,8 @@ fn find_partition_entries( for column in column_defs { let column_name = &column.name; let data_type = ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?, + ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone()) + .context(ColumnDataTypeSnafu)?, ); column_name_and_type.push((column_name, data_type)); } diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index f7c64d8ba117..669121454206 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -144,16 +144,19 @@ mod tests { column_name: "id".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "name".to_string(), datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, + ..Default::default() }, ColumnSchema { column_name: "age".to_string(), datatype: ColumnDataType::Uint32 as i32, semantic_type: SemanticType::Field as i32, + ..Default::default() }, ]; let rows = vec![ diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 6e52f7139d81..6ecbaa4e33ff 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -302,31 +302,37 @@ fn build_insert_column_schemas() -> Vec { column_name: "schema".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "name".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "engine".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Tag.into(), + ..Default::default() }, PbColumnSchema { column_name: "script".to_string(), datatype: ColumnDataType::String.into(), semantic_type: SemanticType::Field.into(), + ..Default::default() }, PbColumnSchema { column_name: "greptime_timestamp".to_string(), datatype: ColumnDataType::TimestampMillisecond.into(), semantic_type: SemanticType::Timestamp.into(), + ..Default::default() }, PbColumnSchema { column_name: "gmt_modified".to_string(), datatype: ColumnDataType::TimestampMillisecond.into(), semantic_type: SemanticType::Field.into(), + ..Default::default() }, ] } @@ -358,7 +364,9 @@ pub fn build_scripts_schema() -> RawSchema { let cs = ColumnSchema::new( c.column_name, // Safety: the type always exists - ColumnDataTypeWrapper::try_new(c.datatype).unwrap().into(), + ColumnDataTypeWrapper::try_new(c.datatype, c.datatype_extension) + .unwrap() + .into(), false, ); if c.semantic_type == SemanticType::Timestamp as i32 { diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 955bda833bcc..a0a4725fa861 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -577,6 +577,7 @@ mod tests { column_name: k.to_string(), datatype: t as i32, semantic_type: s as i32, + ..Default::default() }) .collect() } diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 9dfe92738706..b4daad3ecdac 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -215,6 +215,7 @@ fn write_by_semantic_type( column_name: name.to_string(), datatype: datatype as i32, semantic_type: semantic_type as i32, + ..Default::default() }); one_row.push(value.into()); } else { @@ -269,6 +270,7 @@ pub fn write_ts_precision( column_name: name, datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, + ..Default::default() }); one_row.push(ValueData::TimestampMillisecondValue(ts).into()) } else { diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 6e1c236fac4a..f4d2f4b84783 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -81,7 +81,11 @@ impl ColumnMetadata { .context(ConvertDatatypesSnafu)?, ) }; - let data_type = ColumnDataTypeWrapper::new(column_def.data_type()).into(); + let data_type = ColumnDataTypeWrapper::new( + column_def.data_type(), + column_def.datatype_extension.clone(), + ) + .into(); let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) .with_default_constraint(default_constrain) .context(ConvertDatatypesSnafu)?; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index e7633a2964a0..928f2abd73ee 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -534,6 +534,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Field as i32, comment: String::new(), + ..Default::default() }), column_id: 1, }), diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 89e68687aacb..f7dac2f82315 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -154,7 +154,8 @@ impl TryFrom for AddColumn { })?; let data_type = column_def.data_type; - let data_type = ColumnDataTypeWrapper::try_new(data_type) + let data_type_ext = column_def.datatype_extension.clone(); + let data_type = ColumnDataTypeWrapper::try_new(data_type, data_type_ext) .map_err(|_| { InvalidRawRegionRequestSnafu { err: format!("unknown raw column datatype: {data_type}"), @@ -317,6 +318,7 @@ mod tests { default_constraint: vec![], semantic_type: SemanticType::Tag as _, comment: String::new(), + ..Default::default() }), column_id: 1, }), @@ -333,6 +335,7 @@ mod tests { .unwrap(), semantic_type: SemanticType::Field as _, comment: String::new(), + ..Default::default() }), column_id: 2, }), diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 123e864dd553..ded795861476 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -340,6 +340,7 @@ CREATE TABLE {table_name} ( null_mask: vec![32, 0], semantic_type: SemanticType::Tag as i32, datatype: ColumnDataType::Int32 as i32, + ..Default::default() }, Column { column_name: "b".to_string(), @@ -573,6 +574,7 @@ CREATE TABLE {table_name} ( null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Int32 as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), @@ -611,6 +613,7 @@ CREATE TABLE {table_name} ( null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::String as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), @@ -743,6 +746,7 @@ CREATE TABLE {table_name} ( null_mask: vec![4], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }, Column { column_name: "ts".to_string(), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 3d766a5a37f0..a0d19908f503 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -267,6 +267,7 @@ fn expect_data() -> (Column, Column, Column, Column) { null_mask: vec![2], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let expected_mem_col = Column { column_name: "memory".to_string(), @@ -277,6 +278,7 @@ fn expect_data() -> (Column, Column, Column, Column) { null_mask: vec![4], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, + ..Default::default() }; let expected_ts_col = Column { column_name: "ts".to_string(), From 495b24670e0b75cf28528c802e5da5c19876dfe5 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:07:18 +0800 Subject: [PATCH 02/12] feat: decimal128 grpc --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/api/Cargo.toml | 2 + src/common/decimal/src/decimal128.rs | 17 +++ src/common/grpc/src/select.rs | 39 ++++-- src/datatypes/src/data_type.rs | 2 + src/datatypes/src/types/decimal_type.rs | 22 +++- src/datatypes/src/vectors/decimal.rs | 19 +++ src/sql/src/statements.rs | 34 ++++- .../common/types/decimal/decimal_basic.result | 121 ++++++++++++++++++ .../common/types/decimal/decimal_basic.sql | 30 +++++ 11 files changed, 272 insertions(+), 20 deletions(-) create mode 100644 tests/cases/standalone/common/types/decimal/decimal_basic.result create mode 100644 tests/cases/standalone/common/types/decimal/decimal_basic.sql diff --git a/Cargo.lock b/Cargo.lock index 5da1245210c1..1bf0ec939981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,6 +199,7 @@ name = "api" version = "0.4.3" dependencies = [ "common-base", + "common-decimal", "common-error", "common-macro", "common-time", @@ -3538,7 +3539,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab" +source = "git+https://github.com/Quenkar/greptime-proto.git?rev=62659614fa415051c1ffb72f69f70612d67ed301#62659614fa415051c1ffb72f69f70612d67ed301" dependencies = [ "prost 0.12.2", "serde", @@ -8537,6 +8538,7 @@ dependencies = [ "common-error", "common-macro", "common-query", + "common-telemetry", "common-time", "datafusion-sql", "datatypes", diff --git a/Cargo.toml b/Cargo.toml index ba46247cf922..42ffa22f243e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" } +greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "62659614fa415051c1ffb72f69f70612d67ed301" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7c1ff3e04ba6..9beea1ff51b9 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -6,11 +6,13 @@ license.workspace = true [dependencies] common-base.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-time.workspace = true datatypes.workspace = true greptime-proto.workspace = true +paste = "1.0" prost.workspace = true snafu.workspace = true tonic.workspace = true diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index c68039fb57a7..78343f0a8c6e 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -100,6 +100,23 @@ impl Decimal128 { pub fn to_scalar_value(&self) -> (Option, u8, i8) { (Some(self.value), self.precision, self.scale) } + + /// Convert to PbDecimal128, the PbDecimal128 is represented as (high-64 bit, low-64 bit, precision, scale) + /// Return (high-64 bit, low-64 bit, precision, scale) + pub fn to_pb_decimal128(&self) -> (i64, i64, i32, i32) { + ( + (self.value >> 64) as i64, + self.value as i64, + self.precision as i32, + self.scale as i32, + ) + } + + /// Convert from PbDecimal128 + pub fn from_pb_decimal128(hi: i64, lo: i64, precision: i32, scale: i32) -> Self { + let value = (hi as i128) << 64 | lo as i128; + Self::new(value, precision as u8, scale as i8) + } } /// The default value of Decimal128 is 0, and its precision is 1 and scale is 0. diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 4c6e4a6af99c..a399525fdbdc 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::helper::convert_i128_to_interval; +use api::helper::{convert_i128_to_interval, convert_to_pb_decimal128}; use api::v1::column::Values; use common_base::BitVec; use datatypes::types::{DurationType, IntervalType, TimeType, TimestampType, WrapperType}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, - Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalDayTimeVector, - IntervalMonthDayNanoVector, IntervalYearMonthVector, StringVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, - UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, + Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, + StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, + TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, UInt32Vector, UInt64Vector, + UInt8Vector, VectorRef, }; use snafu::OptionExt; @@ -71,8 +72,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - // TODO(QuenKar): support gRPC for Decimal128 - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Decimal128(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } @@ -238,6 +238,12 @@ pub fn values(arrays: &[VectorRef]) -> Result { DurationNanosecondVector, duration_nanosecond_values, |x| { x.into_native() } + ), + ( + ConcreteDataType::Decimal128(_), + Decimal128Vector, + decimal128_values, + |x| { convert_to_pb_decimal128(x) } ) ) } @@ -315,6 +321,19 @@ mod tests { assert_eq!(vec![1, 2, 3], values.duration_second_values); } + #[test] + fn test_convert_arrow_array_decimal128() { + let array = Decimal128Vector::from(vec![Some(1), Some(2), None, Some(3)]); + + let vals = values(&[Arc::new(array)]).unwrap(); + (0..3).for_each(|i| { + assert_eq!(vals.decimal128_values[i].hi, 0); + assert_eq!(vals.decimal128_values[i].lo, i as i64 + 1); + assert_eq!(vals.decimal128_values[i].precision, 38,); + assert_eq!(vals.decimal128_values[i].scale, 10,); + }); + } + #[test] fn test_convert_arrow_arrays_string() { let array = StringVector::from(vec![ diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 877bf47e3bf5..61470eec6bd7 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -142,6 +142,7 @@ impl ConcreteDataType { | ConcreteDataType::Time(_) | ConcreteDataType::Interval(_) | ConcreteDataType::Duration(_) + | ConcreteDataType::Decimal128(_) ) } @@ -676,6 +677,7 @@ mod tests { assert!(ConcreteDataType::duration_millisecond_datatype().is_stringifiable()); assert!(ConcreteDataType::duration_microsecond_datatype().is_stringifiable()); assert!(ConcreteDataType::duration_nanosecond_datatype().is_stringifiable()); + assert!(ConcreteDataType::decimal128_datatype(10, 2).is_stringifiable()); } #[test] diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs index edda8fe9f7eb..8e780b2b5003 100644 --- a/src/datatypes/src/types/decimal_type.rs +++ b/src/datatypes/src/types/decimal_type.rs @@ -13,6 +13,7 @@ // limitations under the License. use arrow_schema::DataType as ArrowDataType; +use common_decimal::decimal128::DECIMAL128_MAX_PRECISION; use common_decimal::Decimal128; use serde::{Deserialize, Serialize}; @@ -32,7 +33,17 @@ pub struct Decimal128Type { impl Decimal128Type { pub fn new(precision: u8, scale: i8) -> Self { - Self { precision, scale } + // debug assert precision and scale is valid + debug_assert!( + precision > 0 && precision <= DECIMAL128_MAX_PRECISION, + "precision should be in [1, {}]", + DECIMAL128_MAX_PRECISION + ); + debug_assert!( + scale >= 0 && scale <= precision as i8, + "scale should be in [0, precision]" + ); + Decimal128Type { precision, scale } } pub fn precision(&self) -> u8 { @@ -46,7 +57,7 @@ impl Decimal128Type { impl DataType for Decimal128Type { fn name(&self) -> &str { - "decimal128" + "decimal" } fn logical_type_id(&self) -> LogicalTypeId { @@ -62,7 +73,12 @@ impl DataType for Decimal128Type { } fn create_mutable_vector(&self, capacity: usize) -> Box { - Box::new(Decimal128VectorBuilder::with_capacity(capacity)) + Box::new( + Decimal128VectorBuilder::with_capacity(capacity) + .with_precision_and_scale(self.precision, self.scale) + // safe to unwrap because we have validated the precision and scale in new() + .unwrap(), + ) } fn try_cast(&self, val: Value) -> Option { diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs index 1303303d7b66..ebcdb43d6e34 100644 --- a/src/datatypes/src/vectors/decimal.rs +++ b/src/datatypes/src/vectors/decimal.rs @@ -392,7 +392,26 @@ pub mod tests { let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]); let decimal_vector = Decimal128Vector::from(decimal_array); let expect = Decimal128Vector::from_values(vec![123, 456]); + assert_eq!(decimal_vector, expect); + + let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]) + .with_precision_and_scale(10, 2) + .unwrap(); + let decimal_vector = Decimal128Vector::from(decimal_array); + let expect = Decimal128Vector::from_values(vec![123, 456]) + .with_precision_and_scale(10, 2) + .unwrap(); + assert_eq!(decimal_vector, expect); + let decimal_array: ArrayRef = Arc::new( + Decimal128Array::from(vec![Some(123), Some(456)]) + .with_precision_and_scale(3, 2) + .unwrap(), + ); + let decimal_vector = Decimal128Vector::try_from_arrow_array(decimal_array).unwrap(); + let expect = Decimal128Vector::from_values(vec![123, 456]) + .with_precision_and_scale(3, 2) + .unwrap(); assert_eq!(decimal_vector, expect); } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index eae6551c30e1..e7a5d2d9386b 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -107,6 +107,16 @@ fn parse_string_to_value( .fail() } } + ConcreteDataType::Decimal128(_) => { + if let Ok(val) = common_decimal::Decimal128::from_str(&s) { + Ok(Value::Decimal128(val)) + } else { + ParseSqlValueSnafu { + msg: format!("Fail to parse number {s} to Decimal128 value"), + } + .fail() + } + } _ => { unreachable!() } @@ -146,7 +156,19 @@ macro_rules! parse_number_to_value { let n = parse_sql_number::($n)?; Ok(Value::Timestamp(Timestamp::new(n, t.unit()))) }, - // TODO(QuenKar): parse decimal128 string with precision and scale + // TODO(QuenKar): This could need to be optimized + // if this from_str function is slow, + // we can implement parse decimal string with precision and scale manually. + ConcreteDataType::Decimal128(_) => { + if let Ok(val) = common_decimal::Decimal128::from_str($n) { + Ok(Value::Decimal128(val)) + } else { + ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, invalid column type: {:?}", + $n, $data_type) + }.fail() + } + } _ => ParseSqlValueSnafu { msg: format!("Fail to parse number {}, invalid column type: {:?}", @@ -356,18 +378,20 @@ pub fn sql_column_def_to_grpc_column_def(col: &ColumnDef) -> Result Date: Tue, 21 Nov 2023 16:31:03 +0800 Subject: [PATCH 03/12] feat: add test case --- Cargo.lock | 1 - src/api/src/helper.rs | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 1bf0ec939981..5e45753a4113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8538,7 +8538,6 @@ dependencies = [ "common-error", "common-macro", "common-query", - "common-telemetry", "common-time", "datafusion-sql", "datatypes", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 91ffc4582e58..3c18033fc47b 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -1220,6 +1220,10 @@ mod tests { let values = values_with_capacity(ColumnDataType::DurationMillisecond, 2); let values = values.duration_millisecond_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::Decimal128, 2); + let values = values.decimal128_values; + assert_eq!(2, values.capacity()); } #[test] @@ -1307,6 +1311,10 @@ mod tests { assert_eq!( ConcreteDataType::duration_millisecond_datatype(), ColumnDataTypeWrapper::duration_millisecond_datatype().into() + ); + assert_eq!( + ConcreteDataType::decimal128_datatype(10, 2), + ColumnDataTypeWrapper::decimal128_datatype(10, 2).into() ) } @@ -1403,6 +1411,13 @@ mod tests { .unwrap() ); + assert_eq!( + ColumnDataTypeWrapper::decimal128_datatype(10, 2), + ConcreteDataType::decimal128_datatype(10, 2) + .try_into() + .unwrap() + ); + let result: Result = ConcreteDataType::null_datatype().try_into(); assert!(result.is_err()); assert_eq!( From 5c7ab97811f30813fd0a26c3309a0c45c0f41de4 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:34:37 +0800 Subject: [PATCH 04/12] chore: add TODO --- src/datatypes/src/types/decimal_type.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs index 8e780b2b5003..1c50adbd74bd 100644 --- a/src/datatypes/src/types/decimal_type.rs +++ b/src/datatypes/src/types/decimal_type.rs @@ -57,6 +57,7 @@ impl Decimal128Type { impl DataType for Decimal128Type { fn name(&self) -> &str { + // TODO(QuenKar): support precision and scale information in name "decimal" } From 1b85d1cf8e943ed3ebda092b5a5e83f56cb0e16e Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:46:14 +0800 Subject: [PATCH 05/12] chore: empty line --- tests/cases/standalone/common/types/decimal/decimal_basic.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cases/standalone/common/types/decimal/decimal_basic.sql b/tests/cases/standalone/common/types/decimal/decimal_basic.sql index 807740b0d66e..fe67ca03413d 100644 --- a/tests/cases/standalone/common/types/decimal/decimal_basic.sql +++ b/tests/cases/standalone/common/types/decimal/decimal_basic.sql @@ -27,4 +27,4 @@ select max(d) from decimals; select min(d) from decimals; select count(d) from decimals; -drop table decimals; \ No newline at end of file +drop table decimals; From d5e963e422da295739e1713cc0d1ead796f9ad21 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 22 Nov 2023 18:02:18 +0800 Subject: [PATCH 06/12] chore: remove precision and scale --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 142 +++++++++++---------------- src/common/decimal/src/decimal128.rs | 21 ++-- src/common/grpc/src/select.rs | 2 - src/mito2/src/request.rs | 13 +-- 6 files changed, 67 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e45753a4113..a862f4bef174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3539,7 +3539,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/Quenkar/greptime-proto.git?rev=62659614fa415051c1ffb72f69f70612d67ed301#62659614fa415051c1ffb72f69f70612d67ed301" +source = "git+https://github.com/Quenkar/greptime-proto.git?rev=7de903043be441302a92c6ef6e843dd428d1d7bf#7de903043be441302a92c6ef6e843dd428d1d7bf" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index 42ffa22f243e..ffc1dfc829cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "62659614fa415051c1ffb72f69f70612d67ed301" } +greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "7de903043be441302a92c6ef6e843dd428d1d7bf" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 3c18033fc47b..a5a4e06b625d 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -35,14 +35,14 @@ use datatypes::vectors::{ TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; +use greptime_proto::v1; use greptime_proto::v1::column_data_type_extension::Ext; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ - self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, - SemanticType, + ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, SemanticType, }; use paste::paste; use snafu::prelude::*; @@ -50,6 +50,7 @@ use snafu::prelude::*; use crate::error::{self, Result}; use crate::v1::column::Values; use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; + #[derive(Debug, PartialEq)] pub struct ColumnDataTypeWrapper { datatype: ColumnDataType, @@ -506,14 +507,12 @@ pub fn convert_i128_to_interval(v: i128) -> v1::IntervalMonthDayNano { } } -/// Convert common decimal128 to grpc decimal128. +/// Convert common decimal128 to grpc decimal128 without precision and scale. pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 { - let (hi, lo, precision, scale) = v.to_pb_decimal128(); + let value = v.val(); v1::Decimal128 { - hi, - lo, - precision, - scale, + hi: (value >> 64) as i64, + lo: value as i64, } } @@ -562,12 +561,10 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::DurationMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), ValueData::DurationMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), - ValueData::Decimal128Value(v) => ValueRef::Decimal128(Decimal128::from_pb_decimal128( - v.hi, - v.lo, - v.precision, - v.scale, - )), + ValueData::Decimal128Value(v) => { + // Although the precision and scale are not used here, we still need to pass them in. + ValueRef::Decimal128(Decimal128::from_pb_decimal128(v.hi, v.lo, 38, 10)) + } } } @@ -655,11 +652,10 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> )), }, // When should I check precision and scale? - ConcreteDataType::Decimal128(_) => Arc::new(Decimal128Vector::from_values( - values - .decimal128_values - .iter() - .map(|x| Decimal128::from_pb_decimal128(x.hi, x.lo, x.precision, x.scale).into()), + ConcreteDataType::Decimal128(d) => Arc::new(Decimal128Vector::from_values( + values.decimal128_values.iter().map(|x| { + Decimal128::from_pb_decimal128(x.hi, x.lo, d.precision(), d.scale()).into() + }), )), ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() @@ -831,15 +827,15 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), - ConcreteDataType::Decimal128(_) => values + ConcreteDataType::Decimal128(d) => values .decimal128_values .into_iter() .map(|v| { Value::Decimal128(Decimal128::from_pb_decimal128( v.hi, v.lo, - v.precision, - v.scale, + d.precision(), + d.scale(), )) }) .collect(), @@ -972,14 +968,9 @@ pub fn to_proto_value(value: Value) -> Option { }, }, Value::Decimal128(v) => { - let (hi, lo, precision, scale) = v.to_pb_decimal128(); + let (hi, lo) = v.to_pb_decimal128(); v1::Value { - value_data: Some(ValueData::Decimal128Value(v1::Decimal128 { - hi, - lo, - precision, - scale, - })), + value_data: Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })), } } Value::List(_) => return None, @@ -988,60 +979,42 @@ pub fn to_proto_value(value: Value) -> Option { Some(proto_value) } -/// Returns the [ColumnDataType] of the value. +/// Returns the [ColumnDataTypeWrapper] of the value. /// /// If value is null, returns `None`. -pub fn proto_value_type(value: &v1::Value) -> Option { +pub fn proto_value_type(value: &v1::Value) -> Option { let value_type = match value.value_data.as_ref()? { - ValueData::I8Value(_) => ColumnDataTypeWrapper::int8_datatype(), - ValueData::I16Value(_) => ColumnDataTypeWrapper::int16_datatype(), - ValueData::I32Value(_) => ColumnDataTypeWrapper::int32_datatype(), - ValueData::I64Value(_) => ColumnDataTypeWrapper::int64_datatype(), - ValueData::U8Value(_) => ColumnDataTypeWrapper::uint8_datatype(), - ValueData::U16Value(_) => ColumnDataTypeWrapper::uint16_datatype(), - ValueData::U32Value(_) => ColumnDataTypeWrapper::uint32_datatype(), - ValueData::U64Value(_) => ColumnDataTypeWrapper::uint64_datatype(), - ValueData::F32Value(_) => ColumnDataTypeWrapper::float32_datatype(), - ValueData::F64Value(_) => ColumnDataTypeWrapper::float64_datatype(), - ValueData::BoolValue(_) => ColumnDataTypeWrapper::boolean_datatype(), - ValueData::BinaryValue(_) => ColumnDataTypeWrapper::binary_datatype(), - ValueData::StringValue(_) => ColumnDataTypeWrapper::string_datatype(), - ValueData::DateValue(_) => ColumnDataTypeWrapper::date_datatype(), - ValueData::DatetimeValue(_) => ColumnDataTypeWrapper::datetime_datatype(), - ValueData::TimestampSecondValue(_) => ColumnDataTypeWrapper::timestamp_second_datatype(), - ValueData::TimestampMillisecondValue(_) => { - ColumnDataTypeWrapper::timestamp_millisecond_datatype() - } - ValueData::TimestampMicrosecondValue(_) => { - ColumnDataTypeWrapper::timestamp_microsecond_datatype() - } - ValueData::TimestampNanosecondValue(_) => { - ColumnDataTypeWrapper::timestamp_nanosecond_datatype() - } - ValueData::TimeSecondValue(_) => ColumnDataTypeWrapper::time_second_datatype(), - ValueData::TimeMillisecondValue(_) => ColumnDataTypeWrapper::time_millisecond_datatype(), - ValueData::TimeMicrosecondValue(_) => ColumnDataTypeWrapper::time_microsecond_datatype(), - ValueData::TimeNanosecondValue(_) => ColumnDataTypeWrapper::time_nanosecond_datatype(), - ValueData::IntervalYearMonthValues(_) => { - ColumnDataTypeWrapper::interval_year_month_datatype() - } - ValueData::IntervalDayTimeValues(_) => ColumnDataTypeWrapper::interval_day_time_datatype(), - ValueData::IntervalMonthDayNanoValues(_) => { - ColumnDataTypeWrapper::interval_month_day_nano_datatype() - } - ValueData::DurationSecondValue(_) => ColumnDataTypeWrapper::duration_second_datatype(), - ValueData::DurationMillisecondValue(_) => { - ColumnDataTypeWrapper::duration_millisecond_datatype() - } - ValueData::DurationMicrosecondValue(_) => { - ColumnDataTypeWrapper::duration_microsecond_datatype() - } - ValueData::DurationNanosecondValue(_) => { - ColumnDataTypeWrapper::duration_nanosecond_datatype() - } - ValueData::Decimal128Value(v) => { - ColumnDataTypeWrapper::decimal128_datatype(v.precision, v.scale) - } + ValueData::I8Value(_) => ColumnDataType::Int8, + ValueData::I16Value(_) => ColumnDataType::Int16, + ValueData::I32Value(_) => ColumnDataType::Int32, + ValueData::I64Value(_) => ColumnDataType::Int64, + ValueData::U8Value(_) => ColumnDataType::Uint8, + ValueData::U16Value(_) => ColumnDataType::Uint16, + ValueData::U32Value(_) => ColumnDataType::Uint32, + ValueData::U64Value(_) => ColumnDataType::Uint64, + ValueData::F32Value(_) => ColumnDataType::Float32, + ValueData::F64Value(_) => ColumnDataType::Float64, + ValueData::BoolValue(_) => ColumnDataType::Boolean, + ValueData::BinaryValue(_) => ColumnDataType::Binary, + ValueData::StringValue(_) => ColumnDataType::String, + ValueData::DateValue(_) => ColumnDataType::Date, + ValueData::DatetimeValue(_) => ColumnDataType::Datetime, + ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond, + ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond, + ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth, + ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime, + ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano, + ValueData::DurationSecondValue(_) => ColumnDataType::DurationSecond, + ValueData::DurationMillisecondValue(_) => ColumnDataType::DurationMillisecond, + ValueData::DurationMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, + ValueData::DurationNanosecondValue(_) => ColumnDataType::DurationNanosecond, + ValueData::Decimal128Value(_) => ColumnDataType::Decimal128, }; Some(value_type) } @@ -1105,13 +1078,8 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), Value::Decimal128(v) => { - let (hi, lo, precision, scale) = v.to_pb_decimal128(); - Some(ValueData::Decimal128Value(v1::Decimal128 { - hi, - lo, - precision, - scale, - })) + let (hi, lo) = v.to_pb_decimal128(); + Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })) } Value::List(_) => unreachable!(), }, diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index 78343f0a8c6e..bd362826363f 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -96,26 +96,23 @@ impl Decimal128 { self.scale } - /// Convert to ScalarValue + /// Convert to ScalarValue(value,precision,scale) pub fn to_scalar_value(&self) -> (Option, u8, i8) { (Some(self.value), self.precision, self.scale) } - /// Convert to PbDecimal128, the PbDecimal128 is represented as (high-64 bit, low-64 bit, precision, scale) - /// Return (high-64 bit, low-64 bit, precision, scale) - pub fn to_pb_decimal128(&self) -> (i64, i64, i32, i32) { - ( - (self.value >> 64) as i64, - self.value as i64, - self.precision as i32, - self.scale as i32, - ) + /// Convert to PbDecimal128, the PbDecimal128 is represented as (high-64 bit, low-64 bit), + /// and the precision and scale is not stored in PbDecimal128. + /// + /// Return (high-64 bit, low-64 bit) + pub fn to_pb_decimal128(&self) -> (i64, i64) { + ((self.value >> 64) as i64, self.value as i64) } /// Convert from PbDecimal128 - pub fn from_pb_decimal128(hi: i64, lo: i64, precision: i32, scale: i32) -> Self { + pub fn from_pb_decimal128(hi: i64, lo: i64, precision: u8, scale: i8) -> Self { let value = (hi as i128) << 64 | lo as i128; - Self::new(value, precision as u8, scale as i8) + Self::new(value, precision, scale) } } diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index a399525fdbdc..6b53a5900c73 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -329,8 +329,6 @@ mod tests { (0..3).for_each(|i| { assert_eq!(vals.decimal128_values[i].hi, 0); assert_eq!(vals.decimal128_values[i].lo, i as i64 + 1); - assert_eq!(vals.decimal128_values[i].precision, 38,); - assert_eq!(vals.decimal128_values[i].scale, 10,); }); } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 14cb5281c174..c2ace16ca97b 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -358,8 +358,7 @@ pub(crate) fn validate_proto_value( value: &Value, column_schema: &ColumnSchema, ) -> Result<()> { - if let Some(column_type) = proto_value_type(value) { - let (value_type, value_type_ext) = column_type.datatype(); + if let Some(value_type) = proto_value_type(value) { ensure!( value_type as i32 == column_schema.datatype, InvalidRequestSnafu { @@ -373,16 +372,6 @@ pub(crate) fn validate_proto_value( ), } ); - ensure!( - value_type_ext == column_schema.datatype_extension, - InvalidRequestSnafu { - region_id, - reason: format!( - "value has type extension {:?}, but column {} has type extension {:?}", - value_type_ext, column_schema.column_name, column_schema.datatype_extension, - ), - } - ); } Ok(()) From f9c07d60c6504b58611bfa11e89f2934e9438a95 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 23 Nov 2023 12:05:52 +0800 Subject: [PATCH 07/12] refactor: remove precision and scale --- src/api/src/helper.rs | 33 ++++++++++++++++++++++++---- src/mito2/src/memtable/key_values.rs | 24 ++++++++++++++++---- src/partition/src/splitter.rs | 6 ++++- 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index a5a4e06b625d..b5e916268550 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_base::BitVec; +use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION}; use common_decimal::Decimal128; use common_time::interval::IntervalUnit; use common_time::time::Time; @@ -51,6 +52,8 @@ use crate::error::{self, Result}; use crate::v1::column::Values; use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; +/// ColumnDataTypeWrapper is a wrapper of ColumnDataType and ColumnDataTypeExtension. +/// It could be used to convert with ConcreteDataType. #[derive(Debug, PartialEq)] pub struct ColumnDataTypeWrapper { datatype: ColumnDataType, @@ -58,6 +61,7 @@ pub struct ColumnDataTypeWrapper { } impl ColumnDataTypeWrapper { + /// Try to create a ColumnDataTypeWrapper from i32(ColumnDataType) and ColumnDataTypeExtension. pub fn try_new(datatype: i32, datatype_ext: Option) -> Result { let datatype = ColumnDataType::try_from(datatype) .context(error::UnknownColumnDataTypeSnafu { datatype })?; @@ -67,6 +71,7 @@ impl ColumnDataTypeWrapper { }) } + /// Create a ColumnDataTypeWrapper from ColumnDataType and ColumnDataTypeExtension. pub fn new(datatype: ColumnDataType, datatype_ext: Option) -> Self { Self { datatype, @@ -74,6 +79,7 @@ impl ColumnDataTypeWrapper { } } + /// Get a tuple of ColumnDataType and ColumnDataTypeExtension. pub fn datatype(&self) -> (ColumnDataType, Option) { (self.datatype, self.datatype_ext.clone()) } @@ -516,7 +522,10 @@ pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 { } } -pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { +pub fn pb_value_to_value_ref<'a>( + value: &'a v1::Value, + datatype_ext: &'a Option, +) -> ValueRef<'a> { let Some(value) = &value.value_data else { return ValueRef::Null; }; @@ -562,8 +571,25 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::DurationMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), ValueData::Decimal128Value(v) => { - // Although the precision and scale are not used here, we still need to pass them in. - ValueRef::Decimal128(Decimal128::from_pb_decimal128(v.hi, v.lo, 38, 10)) + // get precision and scale from datatype_extension + if let Some(Ext::DecimalType(d)) = + datatype_ext.as_ref().and_then(|ext| ext.ext.as_ref()) + { + ValueRef::Decimal128(Decimal128::from_pb_decimal128( + v.hi, + v.lo, + d.precision as u8, + d.scale as i8, + )) + } else { + // If the precision and scale are not set, use the default value. + ValueRef::Decimal128(Decimal128::from_pb_decimal128( + v.hi, + v.lo, + DECIMAL128_MAX_PRECISION, + DECIMAL128_DEFAULT_SCALE, + )) + } } } } @@ -651,7 +677,6 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> values.duration_nanosecond_values, )), }, - // When should I check precision and scale? ConcreteDataType::Decimal128(d) => Arc::new(Decimal128Vector::from_values( values.decimal128_values.iter().map(|x| { Decimal128::from_pb_decimal128(x.hi, x.lo, d.precision(), d.scale()).into() diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index ce29a89ee051..8b4a44ca04eb 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::{Mutation, OpType, Row, Rows}; +use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows}; use datatypes::value::ValueRef; use store_api::metadata::RegionMetadata; use store_api::storage::SequenceNumber; @@ -45,9 +45,11 @@ impl KeyValues { /// Returns a key value iterator. pub fn iter(&self) -> impl Iterator { let rows = self.mutation.rows.as_ref().unwrap(); + let schemas = &rows.schema; rows.rows.iter().enumerate().map(|(idx, row)| { KeyValue { row, + schemas, helper: &self.helper, sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. // Safety: This is a valid mutation. @@ -72,6 +74,7 @@ impl KeyValues { #[derive(Debug)] pub struct KeyValue<'a> { row: &'a Row, + schemas: &'a Vec, helper: &'a ReadRowHelper, sequence: SequenceNumber, op_type: OpType, @@ -82,21 +85,34 @@ impl<'a> KeyValue<'a> { pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] .iter() - .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) + .map(|idx| { + api::helper::pb_value_to_value_ref( + &self.row.values[*idx], + &self.schemas[*idx].datatype_extension, + ) + }) } /// Get field columns. pub fn fields(&self) -> impl Iterator { self.helper.indices[self.helper.num_primary_key_column + 1..] .iter() - .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) + .map(|idx| { + api::helper::pb_value_to_value_ref( + &self.row.values[*idx], + &self.schemas[*idx].datatype_extension, + ) + }) } /// Get timestamp. pub fn timestamp(&self) -> ValueRef { // Timestamp is primitive, we clone it. let index = self.helper.indices[self.helper.num_primary_key_column]; - api::helper::pb_value_to_value_ref(&self.row.values[index]) + api::helper::pb_value_to_value_ref( + &self.row.values[index], + &self.schemas[index].datatype_extension, + ) } /// Get number of primary key columns. diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 669121454206..ed0eaa2cc571 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -117,7 +117,11 @@ impl<'a> SplitReadRowHelper<'a> { .iter() .map(|idx| { idx.as_ref().map_or(Value::Null, |idx| { - helper::pb_value_to_value_ref(&row.values[*idx]).into() + helper::pb_value_to_value_ref( + &row.values[*idx], + &self.schema[*idx].datatype_extension, + ) + .into() }) }) .collect() From 90199632a32e53b33253984c325c1ec2bd56e846 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 23 Nov 2023 12:06:49 +0800 Subject: [PATCH 08/12] chore: remove sqlness test --- .../common/types/decimal/decimal_basic.result | 121 ------------------ .../common/types/decimal/decimal_basic.sql | 30 ----- 2 files changed, 151 deletions(-) delete mode 100644 tests/cases/standalone/common/types/decimal/decimal_basic.result delete mode 100644 tests/cases/standalone/common/types/decimal/decimal_basic.sql diff --git a/tests/cases/standalone/common/types/decimal/decimal_basic.result b/tests/cases/standalone/common/types/decimal/decimal_basic.result deleted file mode 100644 index 8763b16deeef..000000000000 --- a/tests/cases/standalone/common/types/decimal/decimal_basic.result +++ /dev/null @@ -1,121 +0,0 @@ --- Basic tests for decimal type --- TODO(QuenKar): port more decimal tests from DuckDB -create table decimals( - ts timestamp time index, - d decimal(10, 2), -); - -Affected Rows: 0 - -insert into decimals values (1000, 1.23); - -Affected Rows: 1 - -insert into decimals values - (2000, 3.14), - (3000, '123.45'), - (4000, '1234.56'); - -Affected Rows: 3 - -select * from decimals; - -+---------------------+---------+ -| ts | d | -+---------------------+---------+ -| 1970-01-01T00:00:01 | 1.23 | -| 1970-01-01T00:00:02 | 3.14 | -| 1970-01-01T00:00:03 | 123.45 | -| 1970-01-01T00:00:04 | 1234.56 | -+---------------------+---------+ - --- math operations -select d + 1 from decimals; - -+-----------------------+ -| decimals.d + Int64(1) | -+-----------------------+ -| 2.23 | -| 4.14 | -| 124.45 | -| 1235.56 | -+-----------------------+ - -select d - 1 from decimals; - -+-----------------------+ -| decimals.d - Int64(1) | -+-----------------------+ -| 0.23 | -| 2.14 | -| 122.45 | -| 1233.56 | -+-----------------------+ - -select d * 2 from decimals; - -+-----------------------+ -| decimals.d * Int64(2) | -+-----------------------+ -| 2.46 | -| 6.28 | -| 246.90 | -| 2469.12 | -+-----------------------+ - -select d / 2 from decimals; - -+-----------------------+ -| decimals.d / Int64(2) | -+-----------------------+ -| 0.615000 | -| 1.570000 | -| 61.725000 | -| 617.280000 | -+-----------------------+ - --- aggregate functions -select sum(d) from decimals; - -+-----------------+ -| SUM(decimals.d) | -+-----------------+ -| 1362.38 | -+-----------------+ - -select avg(d) from decimals; - -+-----------------+ -| AVG(decimals.d) | -+-----------------+ -| 340.595000 | -+-----------------+ - -select max(d) from decimals; - -+-----------------+ -| MAX(decimals.d) | -+-----------------+ -| 1234.56 | -+-----------------+ - -select min(d) from decimals; - -+-----------------+ -| MIN(decimals.d) | -+-----------------+ -| 1.23 | -+-----------------+ - -select count(d) from decimals; - -+-------------------+ -| COUNT(decimals.d) | -+-------------------+ -| 4 | -+-------------------+ - -drop table decimals; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/common/types/decimal/decimal_basic.sql b/tests/cases/standalone/common/types/decimal/decimal_basic.sql deleted file mode 100644 index fe67ca03413d..000000000000 --- a/tests/cases/standalone/common/types/decimal/decimal_basic.sql +++ /dev/null @@ -1,30 +0,0 @@ --- Basic tests for decimal type --- TODO(QuenKar): port more decimal tests from DuckDB -create table decimals( - ts timestamp time index, - d decimal(10, 2), -); - -insert into decimals values (1000, 1.23); - -insert into decimals values - (2000, 3.14), - (3000, '123.45'), - (4000, '1234.56'); - -select * from decimals; - --- math operations -select d + 1 from decimals; -select d - 1 from decimals; -select d * 2 from decimals; -select d / 2 from decimals; - --- aggregate functions -select sum(d) from decimals; -select avg(d) from decimals; -select max(d) from decimals; -select min(d) from decimals; -select count(d) from decimals; - -drop table decimals; From 9cf49a80d0d513fe8d766597aee15e036c5b24e7 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 23 Nov 2023 14:34:00 +0800 Subject: [PATCH 09/12] chore: rename --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 18 +++++++++++------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a862f4bef174..950a265fcf7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3539,7 +3539,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/Quenkar/greptime-proto.git?rev=7de903043be441302a92c6ef6e843dd428d1d7bf#7de903043be441302a92c6ef6e843dd428d1d7bf" +source = "git+https://github.com/Quenkar/greptime-proto.git?rev=4ac316719be91132bcd7ca39d9d19151fedeba4f#4ac316719be91132bcd7ca39d9d19151fedeba4f" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index ffc1dfc829cb..4ac9382135eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "7de903043be441302a92c6ef6e843dd428d1d7bf" } +greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "4ac316719be91132bcd7ca39d9d19151fedeba4f" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index b5e916268550..3595bc640756 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -37,7 +37,7 @@ use datatypes::vectors::{ TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; use greptime_proto::v1; -use greptime_proto::v1::column_data_type_extension::Ext; +use greptime_proto::v1::column_data_type_extension::TypeExt; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; @@ -131,10 +131,10 @@ impl From for ConcreteDataType { } ColumnDataType::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), ColumnDataType::Decimal128 => { - if let Some(Ext::DecimalType(d)) = datatype_wrapper + if let Some(TypeExt::DecimalType(d)) = datatype_wrapper .datatype_ext .as_ref() - .and_then(|datatype_ext| datatype_ext.ext.as_ref()) + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) { ConcreteDataType::decimal128_datatype(d.precision as u8, d.scale as i8) } else { @@ -207,7 +207,10 @@ impl ColumnDataTypeWrapper { ColumnDataTypeWrapper { datatype: ColumnDataType::Decimal128, datatype_ext: Some(ColumnDataTypeExtension { - ext: Some(Ext::DecimalType(DecimalTypeExtension { precision, scale })), + type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { + precision, + scale, + })), }), } } @@ -268,7 +271,7 @@ impl TryFrom for ColumnDataTypeWrapper { datatype .as_decimal128() .map(|decimal_type| ColumnDataTypeExtension { - ext: Some(Ext::DecimalType(DecimalTypeExtension { + type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { precision: decimal_type.precision() as i32, scale: decimal_type.scale() as i32, })), @@ -572,8 +575,9 @@ pub fn pb_value_to_value_ref<'a>( ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), ValueData::Decimal128Value(v) => { // get precision and scale from datatype_extension - if let Some(Ext::DecimalType(d)) = - datatype_ext.as_ref().and_then(|ext| ext.ext.as_ref()) + if let Some(TypeExt::DecimalType(d)) = datatype_ext + .as_ref() + .and_then(|column_ext| column_ext.type_ext.as_ref()) { ValueRef::Decimal128(Decimal128::from_pb_decimal128( v.hi, From c8c7d6ec6071ceff513094125e6bdce236995725 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 24 Nov 2023 10:45:41 +0800 Subject: [PATCH 10/12] chore: proto version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 950a265fcf7b..a2b2c0ccecf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3539,7 +3539,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/Quenkar/greptime-proto.git?rev=4ac316719be91132bcd7ca39d9d19151fedeba4f#4ac316719be91132bcd7ca39d9d19151fedeba4f" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11efce55d8ce20257e08842e4f4c1c8fce2b3a8#a11efce55d8ce20257e08842e4f4c1c8fce2b3a8" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4ac9382135eb..4c3c78eef2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "4ac316719be91132bcd7ca39d9d19151fedeba4f" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11efce55d8ce20257e08842e4f4c1c8fce2b3a8" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" From a108e4d8b8527a0c7e04f65779dc9a6452b1a996 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 24 Nov 2023 16:50:50 +0800 Subject: [PATCH 11/12] chore: cr comment. Co-authored-by: Yingwen --- src/api/src/helper.rs | 56 ++++++++++++------- src/common/decimal/src/decimal128.rs | 13 +++-- src/datatypes/src/types/decimal_type.rs | 6 +- src/mito2/src/memtable/key_values.rs | 12 ++-- src/mito2/src/memtable/time_series.rs | 2 +- src/mito2/src/request.rs | 4 +- src/mito2/src/test_util.rs | 2 +- src/operator/src/expr_factory.rs | 2 +- src/operator/src/req_convert/common.rs | 5 +- .../src/req_convert/insert/stmt_to_region.rs | 2 +- src/sql/src/statements.rs | 2 +- 11 files changed, 60 insertions(+), 46 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 3595bc640756..40b8d1533125 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -79,8 +79,13 @@ impl ColumnDataTypeWrapper { } } + /// Get the ColumnDataType. + pub fn datatype(&self) -> ColumnDataType { + self.datatype + } + /// Get a tuple of ColumnDataType and ColumnDataTypeExtension. - pub fn datatype(&self) -> (ColumnDataType, Option) { + pub fn to_parts(&self) -> (ColumnDataType, Option) { (self.datatype, self.datatype_ext.clone()) } } @@ -145,6 +150,12 @@ impl From for ConcreteDataType { } } +/// This macro is used to generate datatype functions +/// with lower style for ColumnDataTypeWrapper. +/// +/// +/// For example: we can use `ColumnDataTypeWrapper::int8_datatype()`, +/// to get a ColumnDataTypeWrapper with datatype `ColumnDataType::Int8`. macro_rules! impl_column_type_functions { ($($Type: ident), +) => { paste! { @@ -162,6 +173,12 @@ macro_rules! impl_column_type_functions { } } +/// This macro is used to generate datatype functions +/// with snake style for ColumnDataTypeWrapper. +/// +/// +/// For example: we can use `ColumnDataTypeWrapper::duration_second_datatype()`, +/// to get a ColumnDataTypeWrapper with datatype `ColumnDataType::DurationSecond`. macro_rules! impl_column_type_functions_with_snake { ($($TypeName: ident), +) => { paste!{ @@ -579,7 +596,7 @@ pub fn pb_value_to_value_ref<'a>( .as_ref() .and_then(|column_ext| column_ext.type_ext.as_ref()) { - ValueRef::Decimal128(Decimal128::from_pb_decimal128( + ValueRef::Decimal128(Decimal128::from_value_precision_scale( v.hi, v.lo, d.precision as u8, @@ -587,7 +604,7 @@ pub fn pb_value_to_value_ref<'a>( )) } else { // If the precision and scale are not set, use the default value. - ValueRef::Decimal128(Decimal128::from_pb_decimal128( + ValueRef::Decimal128(Decimal128::from_value_precision_scale( v.hi, v.lo, DECIMAL128_MAX_PRECISION, @@ -683,7 +700,7 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> }, ConcreteDataType::Decimal128(d) => Arc::new(Decimal128Vector::from_values( values.decimal128_values.iter().map(|x| { - Decimal128::from_pb_decimal128(x.hi, x.lo, d.precision(), d.scale()).into() + Decimal128::from_value_precision_scale(x.hi, x.lo, d.precision(), d.scale()).into() }), )), ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { @@ -860,7 +877,7 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .decimal128_values .into_iter() .map(|v| { - Value::Decimal128(Decimal128::from_pb_decimal128( + Value::Decimal128(Decimal128::from_value_precision_scale( v.hi, v.lo, d.precision(), @@ -885,11 +902,9 @@ pub fn is_column_type_value_eq( type_extension: Option, expect_type: &ConcreteDataType, ) -> bool { - let Ok(column_type_wrapper) = ColumnDataTypeWrapper::try_new(type_value, type_extension) else { - return false; - }; - - is_column_type_eq(column_type_wrapper, expect_type) + ColumnDataTypeWrapper::try_new(type_value, type_extension) + .map(|wrapper| ConcreteDataType::from(wrapper) == *expect_type) + .unwrap_or(false) } /// Convert value into proto's value. @@ -997,7 +1012,7 @@ pub fn to_proto_value(value: Value) -> Option { }, }, Value::Decimal128(v) => { - let (hi, lo) = v.to_pb_decimal128(); + let (hi, lo) = v.split_value(); v1::Value { value_data: Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })), } @@ -1107,7 +1122,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), Value::Decimal128(v) => { - let (hi, lo) = v.to_pb_decimal128(); + let (hi, lo) = v.split_value(); Some(ValueData::Decimal128Value(v1::Decimal128 { hi, lo })) } Value::List(_) => unreachable!(), @@ -1115,15 +1130,6 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { } } -/// Returns true if the column type is equal to expected type. -fn is_column_type_eq(column_type: ColumnDataTypeWrapper, expect_type: &ConcreteDataType) -> bool { - if let Ok(expect) = ColumnDataTypeWrapper::try_from(expect_type.clone()) { - column_type == expect - } else { - false - } -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -2036,4 +2042,12 @@ mod tests { &ConcreteDataType::boolean_datatype(), )); } + + #[test] + fn test_convert_to_pb_decimal128() { + let decimal = Decimal128::new(123, 3, 1); + let pb_decimal = convert_to_pb_decimal128(decimal); + assert_eq!(pb_decimal.lo, 123); + assert_eq!(pb_decimal.hi, 0); + } } diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index bd362826363f..ce23fcf98a97 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -101,16 +101,17 @@ impl Decimal128 { (Some(self.value), self.precision, self.scale) } - /// Convert to PbDecimal128, the PbDecimal128 is represented as (high-64 bit, low-64 bit), - /// and the precision and scale is not stored in PbDecimal128. + /// split the self.value(i128) to (high-64 bit, low-64 bit), and + /// the precision, scale information is discarded. /// - /// Return (high-64 bit, low-64 bit) - pub fn to_pb_decimal128(&self) -> (i64, i64) { + /// Return: (high-64 bit, low-64 bit) + pub fn split_value(&self) -> (i64, i64) { ((self.value >> 64) as i64, self.value as i64) } - /// Convert from PbDecimal128 - pub fn from_pb_decimal128(hi: i64, lo: i64, precision: u8, scale: i8) -> Self { + /// Convert from precision, scale, a i128 value which + /// represents by two i64 value(high-64 bit, low-64 bit). + pub fn from_value_precision_scale(hi: i64, lo: i64, precision: u8, scale: i8) -> Self { let value = (hi as i128) << 64 | lo as i128; Self::new(value, precision, scale) } diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs index 1c50adbd74bd..48ede0c44136 100644 --- a/src/datatypes/src/types/decimal_type.rs +++ b/src/datatypes/src/types/decimal_type.rs @@ -33,13 +33,13 @@ pub struct Decimal128Type { impl Decimal128Type { pub fn new(precision: u8, scale: i8) -> Self { - // debug assert precision and scale is valid - debug_assert!( + // assert precision and scale is valid + assert!( precision > 0 && precision <= DECIMAL128_MAX_PRECISION, "precision should be in [1, {}]", DECIMAL128_MAX_PRECISION ); - debug_assert!( + assert!( scale >= 0 && scale <= precision as i8, "scale should be in [0, precision]" ); diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 8b4a44ca04eb..10854d23a8ae 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -45,11 +45,11 @@ impl KeyValues { /// Returns a key value iterator. pub fn iter(&self) -> impl Iterator { let rows = self.mutation.rows.as_ref().unwrap(); - let schemas = &rows.schema; + let schema = &rows.schema; rows.rows.iter().enumerate().map(|(idx, row)| { KeyValue { row, - schemas, + schema, helper: &self.helper, sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. // Safety: This is a valid mutation. @@ -74,7 +74,7 @@ impl KeyValues { #[derive(Debug)] pub struct KeyValue<'a> { row: &'a Row, - schemas: &'a Vec, + schema: &'a Vec, helper: &'a ReadRowHelper, sequence: SequenceNumber, op_type: OpType, @@ -88,7 +88,7 @@ impl<'a> KeyValue<'a> { .map(|idx| { api::helper::pb_value_to_value_ref( &self.row.values[*idx], - &self.schemas[*idx].datatype_extension, + &self.schema[*idx].datatype_extension, ) }) } @@ -100,7 +100,7 @@ impl<'a> KeyValue<'a> { .map(|idx| { api::helper::pb_value_to_value_ref( &self.row.values[*idx], - &self.schemas[*idx].datatype_extension, + &self.schema[*idx].datatype_extension, ) }) } @@ -111,7 +111,7 @@ impl<'a> KeyValue<'a> { let index = self.helper.indices[self.helper.num_primary_key_column]; api::helper::pb_value_to_value_ref( &self.row.values[index], - &self.schemas[index].datatype_extension, + &self.schema[index].datatype_extension, ) } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 90ccdc6540c8..350265dae4ed 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -951,7 +951,7 @@ mod tests { column_name: c.column_schema.name.clone(), datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) .unwrap() - .datatype() + .to_parts() .0 as i32, semantic_type: c.semantic_type as i32, ..Default::default() diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c2ace16ca97b..5ebd5fae110c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -254,13 +254,13 @@ impl WriteRequest { // Insert column schema. let (datatype, datatype_ext) = ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone()) - .context(ConvertColumnDataTypeSnafu { + .with_context(|_| ConvertColumnDataTypeSnafu { reason: format!( "no protobuf type for column {} ({:?})", column.column_schema.name, column.column_schema.data_type ), })? - .datatype(); + .to_parts(); self.rows.schema.push(ColumnSchema { column_name: column.column_schema.name.clone(), datatype: datatype as i32, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e5f24788375f..59738d0e2ad4 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -533,7 +533,7 @@ pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api let (datatype, datatype_extension) = ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) .unwrap() - .datatype(); + .to_parts(); api::v1::ColumnSchema { column_name: metadata.column_schema.name.clone(), datatype: datatype as i32, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index b4ddfd0cf508..aec2b51566d2 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -316,7 +316,7 @@ pub fn column_schemas_to_defs( .iter() .map(|c| { ColumnDataTypeWrapper::try_from(c.data_type.clone()) - .map(|w| w.datatype()) + .map(|w| w.to_parts()) .context(ColumnDataTypeSnafu) }) .collect::>>()?; diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 937038d030f1..694906b82833 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -59,8 +59,7 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { let null_mask = BitVec::from_vec(column.null_mask); let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension) .context(ColumnDataTypeSnafu)? - .datatype() - .0; + .datatype(); let column_values = column.values.unwrap_or_default(); macro_rules! push_column_values_match_types { @@ -211,7 +210,7 @@ pub fn column_schema( let (datatype, datatype_extension) = ColumnDataTypeWrapper::try_from(vector.data_type().clone()) .context(ColumnDataTypeSnafu)? - .datatype(); + .to_parts(); Ok(ColumnSchema { column_name: column_name.clone(), diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index dcdab7881d8c..1297adf6a5de 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -98,7 +98,7 @@ impl<'a> StatementToRegion<'a> { let (datatype, datatype_extension) = ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) .context(ColumnDataTypeSnafu)? - .datatype(); + .to_parts(); let semantic_type = semantic_type(&table_info, column_name)?; let grpc_column_schema = GrpcColumnSchema { diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index e7a5d2d9386b..e9594aac8e7a 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -381,7 +381,7 @@ pub fn sql_column_def_to_grpc_column_def(col: &ColumnDef) -> Result Date: Fri, 24 Nov 2023 17:08:56 +0800 Subject: [PATCH 12/12] Update src/mito2/src/memtable/time_series.rs Co-authored-by: Yingwen --- src/mito2/src/memtable/time_series.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 350265dae4ed..4dd590212c33 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -951,8 +951,7 @@ mod tests { column_name: c.column_schema.name.clone(), datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) .unwrap() - .to_parts() - .0 as i32, + .datatype() as i32, semantic_type: c.semantic_type as i32, ..Default::default() })