From 106368383f06a2e567e2ca06544182a59e81c948 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 14 Aug 2023 14:58:30 +0800 Subject: [PATCH 1/9] feat: impl datatype, vector traits for duration. --- src/api/src/helper.rs | 2 + src/common/grpc/src/select.rs | 31 ++- src/datatypes/src/data_type.rs | 50 ++++- src/datatypes/src/duration.rs | 142 +++++++++++++ src/datatypes/src/lib.rs | 1 + src/datatypes/src/type_id.rs | 10 + src/datatypes/src/types.rs | 5 + src/datatypes/src/types/duration_type.rs | 182 +++++++++++++++++ src/datatypes/src/value.rs | 45 ++++- src/datatypes/src/vectors.rs | 6 + src/datatypes/src/vectors/duration.rs | 30 +++ src/datatypes/src/vectors/eq.rs | 16 +- src/mito2/src/proto_util.rs | 241 +++++++++++++++++++++++ src/servers/src/mysql/writer.rs | 2 +- src/servers/src/postgres/types.rs | 18 +- src/sql/src/statements.rs | 7 +- 16 files changed, 763 insertions(+), 25 deletions(-) create mode 100644 src/datatypes/src/duration.rs create mode 100644 src/datatypes/src/types/duration_type.rs create mode 100644 src/datatypes/src/vectors/duration.rs create mode 100644 src/mito2/src/proto_util.rs diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index f9ac18ed0c14..238894cd6746 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -146,6 +146,7 @@ impl TryFrom for ColumnDataTypeWrapper { | ConcreteDataType::Dictionary(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } + ConcreteDataType::Duration(_) => todo!("duration is not supported yet"), }); Ok(datatype) } @@ -304,6 +305,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { .interval_month_day_nano_values .push(convert_i128_to_interval(val.to_i128())), }, + Value::Duration(_) => todo!("duration is not supported yet"), Value::List(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index d16f2f37177c..631cb02d4d84 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -15,10 +15,11 @@ use api::helper::convert_i128_to_interval; use api::v1::column::Values; use common_base::BitVec; -use datatypes::types::{IntervalType, TimeType, TimestampType, WrapperType}; +use datatypes::types::{DurationType, IntervalType, TimeType, TimestampType, WrapperType}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalDayTimeVector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, + DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, + Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, @@ -212,6 +213,30 @@ pub fn values(arrays: &[VectorRef]) -> Result { IntervalMonthDayNanoVector, interval_month_day_nano_values, |x| { convert_i128_to_interval(x.into_native()) } + ), + ( + ConcreteDataType::Duration(DurationType::Second(_)), + DurationSecondVector, + ts_second_values, + |x| { x.into_native() } + ), + ( + ConcreteDataType::Duration(DurationType::Millisecond(_)), + DurationMillisecondVector, + ts_millisecond_values, + |x| { x.into_native() } + ), + ( + ConcreteDataType::Duration(DurationType::Microsecond(_)), + DurationMicrosecondVector, + ts_microsecond_values, + |x| { x.into_native() } + ), + ( + ConcreteDataType::Duration(DurationType::Nanosecond(_)), + DurationNanosecondVector, + ts_nanosecond_values, + |x| { x.into_native() } ) ) } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index e1c2b4d07138..99d783d297f0 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -27,11 +27,13 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; use crate::types::{ - BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, Float32Type, Float64Type, - Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, - IntervalType, IntervalYearMonthType, ListType, NullType, StringType, TimeMillisecondType, - TimeType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, TimestampType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, DurationMicrosecondType, + DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, + Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, + IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, ListType, NullType, StringType, + TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, TimestampType, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, }; use crate::value::Value; use crate::vectors::MutableVector; @@ -64,7 +66,10 @@ pub enum ConcreteDataType { Timestamp(TimestampType), Time(TimeType), - // Interval types: + // Duration type: + Duration(DurationType), + + // Interval type: Interval(IntervalType), // Compound types: @@ -96,6 +101,7 @@ impl fmt::Display for ConcreteDataType { ConcreteDataType::List(_) => write!(f, "List"), ConcreteDataType::Dictionary(_) => write!(f, "Dictionary"), ConcreteDataType::Interval(_) => write!(f, "Interval"), + ConcreteDataType::Duration(_) => write!(f, "Duration"), } } } @@ -325,14 +331,37 @@ impl ConcreteDataType { Self::time_datatype(TimeUnit::Nanosecond) } + /// Creates a [Duration(DurationSecondType)] datatype. + pub fn duration_second_datatype() -> Self { + ConcreteDataType::Duration(DurationType::Second(DurationSecondType)) + } + + /// Creates a [Duration(DurationMillisecondType)] datatype. + pub fn duration_millisecond_datatype() -> Self { + ConcreteDataType::Duration(DurationType::Millisecond(DurationMillisecondType)) + } + + /// Creates a [Duration(DurationMicrosecondType)] datatype. + pub fn duration_microsecond_datatype() -> Self { + ConcreteDataType::Duration(DurationType::Microsecond(DurationMicrosecondType)) + } + + /// Creates a [Duration(DurationNanosecondType)] datatype. + pub fn duration_nanosecond_datatype() -> Self { + ConcreteDataType::Duration(DurationType::Nanosecond(DurationNanosecondType)) + } + + /// Creates a [Interval(IntervalMonthDayNanoType)] datatype. pub fn interval_month_day_nano_datatype() -> Self { ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)) } + /// Creates a [Interval(IntervalYearMonthType)] datatype. pub fn interval_year_month_datatype() -> Self { ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)) } + /// Creates a [Interval(IntervalDayTimeType)] datatype. pub fn interval_day_time_datatype() -> Self { ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)) } @@ -356,6 +385,15 @@ impl ConcreteDataType { } } + pub fn duration_datatype(unit: TimeUnit) -> Self { + match unit { + TimeUnit::Second => Self::duration_second_datatype(), + TimeUnit::Millisecond => Self::duration_millisecond_datatype(), + TimeUnit::Microsecond => Self::duration_microsecond_datatype(), + TimeUnit::Nanosecond => Self::duration_nanosecond_datatype(), + } + } + pub fn interval_datatype(unit: IntervalUnit) -> Self { match unit { IntervalUnit::YearMonth => Self::interval_year_month_datatype(), diff --git a/src/datatypes/src/duration.rs b/src/datatypes/src/duration.rs new file mode 100644 index 000000000000..d6f79a7fe137 --- /dev/null +++ b/src/datatypes/src/duration.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::duration::Duration; +use common_time::timestamp::TimeUnit; +use paste::paste; +use serde::{Deserialize, Serialize}; + +use crate::prelude::{Scalar, Value, ValueRef}; +use crate::scalars::ScalarRef; +use crate::types::{ + DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, + WrapperType, +}; +use crate::vectors::{ + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, +}; + +macro_rules! define_duration_with_unit { + ($unit: ident) => { + paste! { + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] + pub struct [](pub Duration); + + impl [] { + pub fn new(val: i64) -> Self { + Self(Duration::new(val, TimeUnit::$unit)) + } + } + + impl Default for [] { + fn default() -> Self { + Self::new(0) + } + } + + impl From<[]> for Value { + fn from(t: []) -> Value { + Value::Duration(t.0) + } + } + + impl From<[]> for serde_json::Value { + fn from(t: []) -> Self { + t.0.into() + } + } + + impl From<[]> for ValueRef<'static> { + fn from(t: []) -> Self { + ValueRef::Duration(t.0) + } + } + + impl Scalar for [] { + type VectorType = []; + type RefType<'a> = []; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + *self + } + + fn upcast_gat<'short, 'long: 'short>( + long: Self::RefType<'long>, + ) -> Self::RefType<'short> { + long + } + } + + impl<'a> ScalarRef<'a> for [] { + type ScalarType = []; + + fn to_owned_scalar(&self) -> Self::ScalarType { + *self + } + } + + impl WrapperType for [] { + type LogicalType = []; + type Native = i64; + + fn from_native(value: Self::Native) -> Self { + Self::new(value) + } + + fn into_native(self) -> Self::Native { + self.0.into() + } + } + + impl From for [] { + fn from(val: i64) -> Self { + []::from_native(val) + } + } + + impl From<[]> for i64{ + fn from(val: []) -> Self { + val.0.value() + } + } + } + }; +} + +define_duration_with_unit!(Second); +define_duration_with_unit!(Millisecond); +define_duration_with_unit!(Microsecond); +define_duration_with_unit!(Nanosecond); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_duration_scalar() { + let d = DurationSecond::new(456); + assert_eq!(d, d.as_scalar_ref()); + assert_eq!(d, d.to_owned_scalar()); + let d = DurationMillisecond::new(456); + assert_eq!(d, d.as_scalar_ref()); + assert_eq!(d, d.to_owned_scalar()); + let d = DurationMicrosecond::new(456); + assert_eq!(d, d.as_scalar_ref()); + assert_eq!(d, d.to_owned_scalar()); + let d = DurationNanosecond::new(456); + assert_eq!(d, d.as_scalar_ref()); + assert_eq!(d, d.to_owned_scalar()); + } +} diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 3d8a7835c4b8..3766cebf8755 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -16,6 +16,7 @@ pub mod arrow_array; pub mod data_type; +pub mod duration; pub mod error; pub mod interval; pub mod macros; diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 8a4a02589da2..3d8635a5fb6d 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -51,6 +51,12 @@ pub enum LogicalTypeId { TimeMillisecond, TimeMicrosecond, TimeNanosecond, + /// A 64-bit duration representing the elapsed time in either seconds, + /// milliseconds, microseconds or nanoseconds. + DurationSecond, + DurationMillisecond, + DurationMicrosecond, + DurationNanosecond, /// A 32-bit interval representing the elapsed time in months. IntervalYearMonth, /// A 64-bit interval representing the elapsed time in days and milliseconds. @@ -113,6 +119,10 @@ impl LogicalTypeId { LogicalTypeId::IntervalMonthDayNano => { ConcreteDataType::interval_month_day_nano_datatype() } + LogicalTypeId::DurationSecond => ConcreteDataType::duration_second_datatype(), + LogicalTypeId::DurationMillisecond => ConcreteDataType::duration_millisecond_datatype(), + LogicalTypeId::DurationMicrosecond => ConcreteDataType::duration_microsecond_datatype(), + LogicalTypeId::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index d01822a3b9e9..4f8b871da42e 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -17,6 +17,7 @@ mod boolean_type; mod date_type; mod datetime_type; mod dictionary_type; +mod duration_type; mod interval_type; mod list_type; mod null_type; @@ -30,6 +31,10 @@ pub use boolean_type::BooleanType; pub use date_type::DateType; pub use datetime_type::DateTimeType; pub use dictionary_type::DictionaryType; +pub use duration_type::{ + DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, + DurationType, +}; pub use interval_type::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, }; diff --git a/src/datatypes/src/types/duration_type.rs b/src/datatypes/src/types/duration_type.rs new file mode 100644 index 000000000000..e5255357288d --- /dev/null +++ b/src/datatypes/src/types/duration_type.rs @@ -0,0 +1,182 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::{ + DataType as ArrowDataType, DurationMicrosecondType as ArrowDurationMicrosecondType, + DurationMillisecondType as ArrowDurationMillisecondType, + DurationNanosecondType as ArrowDurationNanosecondType, + DurationSecondType as ArrowDurationSecondType, TimeUnit as ArrowTimeUnit, +}; +use common_time::duration::Duration; +use common_time::timestamp::TimeUnit; +use enum_dispatch::enum_dispatch; +use paste::paste; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use super::LogicalPrimitiveType; +use crate::data_type::DataType; +use crate::duration::{ + DurationMicrosecond, DurationMillisecond, DurationNanosecond, DurationSecond, +}; +use crate::error; +use crate::prelude::{ + ConcreteDataType, LogicalTypeId, MutableVector, ScalarVectorBuilder, Value, ValueRef, Vector, +}; +use crate::vectors::{ + DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector, + DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder, + DurationSecondVector, DurationSecondVectorBuilder, PrimitiveVector, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[enum_dispatch(DataType)] +pub enum DurationType { + Second(DurationSecondType), + Millisecond(DurationMillisecondType), + Microsecond(DurationMicrosecondType), + Nanosecond(DurationNanosecondType), +} + +impl DurationType { + /// Returns the [`TimeUnit`] of this type. + pub fn unit(&self) -> TimeUnit { + match self { + DurationType::Second(_) => TimeUnit::Second, + DurationType::Millisecond(_) => TimeUnit::Millisecond, + DurationType::Microsecond(_) => TimeUnit::Microsecond, + DurationType::Nanosecond(_) => TimeUnit::Nanosecond, + } + } +} + +macro_rules! impl_data_type_for_duration { + ($unit: ident) => { + paste! { + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] + pub struct []; + + impl DataType for [] { + fn name(&self) -> &str { + stringify!([]) + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::[] + } + + fn default_value(&self) -> Value { + Value::Duration(Duration::new(0, TimeUnit::$unit)) + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Duration(ArrowTimeUnit::$unit) + } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new([]::with_capacity(capacity)) + } + + fn is_timestamp_compatible(&self) -> bool { + false + } + } + + impl LogicalPrimitiveType for [] { + type ArrowPrimitive = []; + type Native = i64; + type Wrapper = []; + type LargestType = Self; + + fn build_data_type() -> ConcreteDataType { + ConcreteDataType::Duration(DurationType::$unit( + []::default(), + )) + } + + fn type_name() -> &'static str { + stringify!([]) + } + + fn cast_vector(vector: &dyn Vector) -> crate::Result<&PrimitiveVector> { + vector + .as_any() + .downcast_ref::<[]>() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to cast {} to {}", + vector.vector_type_name(), stringify!([]) + ), + }) + } + + fn cast_value_ref(value: ValueRef) -> crate::Result> { + match value { + ValueRef::Null => Ok(None), + ValueRef::Int64(v) =>{ + Ok(Some([]::from(v))) + } + ValueRef::Duration(t) => match t.unit() { + TimeUnit::$unit => Ok(Some([](t))), + other => error::CastTypeSnafu { + msg: format!( + "Failed to cast Duration value with different unit {:?} to {}", + other, stringify!([]) + ), + } + .fail(), + }, + other => error::CastTypeSnafu { + msg: format!("Failed to cast value {:?} to {}", other, stringify!([])), + } + .fail(), + } + } + } + } + } +} + +impl_data_type_for_duration!(Second); +impl_data_type_for_duration!(Millisecond); +impl_data_type_for_duration!(Microsecond); +impl_data_type_for_duration!(Nanosecond); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_duration_type_unit() { + assert_eq!( + TimeUnit::Second, + DurationType::Second(DurationSecondType).unit() + ); + + assert_eq!( + TimeUnit::Millisecond, + DurationType::Millisecond(DurationMillisecondType).unit() + ); + + assert_eq!( + TimeUnit::Microsecond, + DurationType::Microsecond(DurationMicrosecondType).unit() + ); + + assert_eq!( + TimeUnit::Nanosecond, + DurationType::Nanosecond(DurationNanosecondType).unit() + ); + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 4d672a8ceacd..b14acfa9115c 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -25,7 +25,7 @@ use common_time::datetime::DateTime; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::{TimeUnit, Timestamp}; -use common_time::Interval; +use common_time::{Duration, Interval}; use datafusion_common::ScalarValue; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; @@ -70,6 +70,7 @@ pub enum Value { DateTime(DateTime), Timestamp(Timestamp), Time(Time), + Duration(Duration), Interval(Interval), List(ListValue), @@ -104,6 +105,7 @@ impl Display for Value { Value::Timestamp(v) => write!(f, "{}", v.to_iso8601_string()), Value::Time(t) => write!(f, "{}", t.to_iso8601_string()), Value::Interval(v) => write!(f, "{}", v.to_iso8601_string()), + Value::Duration(d) => write!(f, "{d}"), Value::List(v) => { let default = Box::>::default(); let items = v.items().as_ref().unwrap_or(&default); @@ -145,6 +147,7 @@ impl Value { Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), Value::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), + Value::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), } } @@ -188,6 +191,7 @@ impl Value { Value::Timestamp(v) => ValueRef::Timestamp(*v), Value::Time(v) => ValueRef::Time(*v), Value::Interval(v) => ValueRef::Interval(*v), + Value::Duration(v) => ValueRef::Duration(*v), } } @@ -246,6 +250,12 @@ impl Value { IntervalUnit::DayTime => LogicalTypeId::IntervalDayTime, IntervalUnit::MonthDayNano => LogicalTypeId::IntervalMonthDayNano, }, + Value::Duration(d) => match d.unit() { + TimeUnit::Second => LogicalTypeId::DurationSecond, + TimeUnit::Millisecond => LogicalTypeId::DurationMillisecond, + TimeUnit::Microsecond => LogicalTypeId::DurationMicrosecond, + TimeUnit::Nanosecond => LogicalTypeId::DurationNanosecond, + }, } } @@ -292,6 +302,7 @@ impl Value { IntervalUnit::DayTime => ScalarValue::IntervalDayTime(Some(v.to_i64())), IntervalUnit::MonthDayNano => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())), }, + Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())), }; Ok(scalar_value) @@ -330,6 +341,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result time_to_scalar_value(t.unit(), None)?, + ConcreteDataType::Duration(d) => duration_to_scalar_value(d.unit(), None), }) } @@ -362,6 +374,16 @@ pub fn time_to_scalar_value(unit: TimeUnit, val: Option) -> Result) -> ScalarValue { + match unit { + TimeUnit::Second => ScalarValue::DurationSecond(val), + TimeUnit::Millisecond => ScalarValue::DurationMillisecond(val), + TimeUnit::Microsecond => ScalarValue::DurationMicrosecond(val), + TimeUnit::Nanosecond => ScalarValue::DurationMicrosecond(val), + } +} + /// Convert [ScalarValue] to [Timestamp]. /// Return `None` if given scalar value cannot be converted to a valid timestamp. pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option { @@ -419,6 +441,7 @@ macro_rules! impl_ord_for_value_like { ($Type::Timestamp(v1), $Type::Timestamp(v2)) => v1.cmp(v2), ($Type::Time(v1), $Type::Time(v2)) => v1.cmp(v2), ($Type::Interval(v1), $Type::Interval(v2)) => v1.cmp(v2), + ($Type::Duration(v1), $Type::Duration(v2)) => v1.cmp(v2), ($Type::List(v1), $Type::List(v2)) => v1.cmp(v2), _ => panic!( "Cannot compare different values {:?} and {:?}", @@ -536,6 +559,7 @@ impl_value_from!(Time, Time); impl_value_from!(DateTime, DateTime); impl_value_from!(Timestamp, Timestamp); impl_value_from!(Interval, Interval); +impl_value_from!(Duration, Duration); impl_value_from!(String, String); impl From<&str> for Value { @@ -581,6 +605,7 @@ impl TryFrom for serde_json::Value { Value::Timestamp(v) => serde_json::to_value(v.value())?, Value::Time(v) => serde_json::to_value(v.value())?, Value::Interval(v) => serde_json::to_value(v.to_i128())?, + Value::Duration(v) => serde_json::to_value(v.value())?, }; Ok(json_value) @@ -740,11 +765,19 @@ impl TryFrom for Value { ScalarValue::IntervalMonthDayNano(t) => t .map(|x| Value::Interval(Interval::from_i128(x))) .unwrap_or(Value::Null), + ScalarValue::DurationSecond(d) => d + .map(|x| Value::Duration(Duration::new(x, TimeUnit::Second))) + .unwrap_or(Value::Null), + ScalarValue::DurationMillisecond(d) => d + .map(|x| Value::Duration(Duration::new(x, TimeUnit::Millisecond))) + .unwrap_or(Value::Null), + ScalarValue::DurationMicrosecond(d) => d + .map(|x| Value::Duration(Duration::new(x, TimeUnit::Microsecond))) + .unwrap_or(Value::Null), + ScalarValue::DurationNanosecond(d) => d + .map(|x| Value::Duration(Duration::new(x, TimeUnit::Nanosecond))) + .unwrap_or(Value::Null), ScalarValue::Decimal128(_, _, _) - | ScalarValue::DurationSecond(_) - | ScalarValue::DurationMillisecond(_) - | ScalarValue::DurationMicrosecond(_) - | ScalarValue::DurationNanosecond(_) | ScalarValue::Struct(_, _) | ScalarValue::Dictionary(_, _) => { return error::UnsupportedArrowTypeSnafu { @@ -811,6 +844,7 @@ pub enum ValueRef<'a> { DateTime(DateTime), Timestamp(Timestamp), Time(Time), + Duration(Duration), Interval(Interval), // Compound types: @@ -987,6 +1021,7 @@ impl_value_ref_from!(DateTime, DateTime); impl_value_ref_from!(Timestamp, Timestamp); impl_value_ref_from!(Time, Time); impl_value_ref_from!(Interval, Interval); +impl_value_ref_from!(Duration, Duration); impl<'a> From<&'a str> for ValueRef<'a> { fn from(string: &'a str) -> ValueRef<'a> { diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 0e4c40fafdf0..d69d1bb82926 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -30,6 +30,7 @@ mod boolean; mod constant; mod date; mod datetime; +mod duration; mod eq; mod helper; mod interval; @@ -47,6 +48,11 @@ pub use boolean::{BooleanVector, BooleanVectorBuilder}; pub use constant::ConstantVector; pub use date::{DateVector, DateVectorBuilder}; pub use datetime::{DateTimeVector, DateTimeVectorBuilder}; +pub use duration::{ + DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector, + DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder, + DurationSecondVector, DurationSecondVectorBuilder, +}; pub use helper::Helper; pub use interval::{ IntervalDayTimeVector, IntervalDayTimeVectorBuilder, IntervalMonthDayNanoVector, diff --git a/src/datatypes/src/vectors/duration.rs b/src/datatypes/src/vectors/duration.rs new file mode 100644 index 000000000000..010c57140976 --- /dev/null +++ b/src/datatypes/src/vectors/duration.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::types::{ + DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, +}; +use crate::vectors::{PrimitiveVector, PrimitiveVectorBuilder}; + +pub type DurationSecondVector = PrimitiveVector; +pub type DurationSecondVectorBuilder = PrimitiveVectorBuilder; + +pub type DurationMillisecondVector = PrimitiveVector; +pub type DurationMillisecondVectorBuilder = PrimitiveVectorBuilder; + +pub type DurationMicrosecondVector = PrimitiveVector; +pub type DurationMicrosecondVectorBuilder = PrimitiveVectorBuilder; + +pub type DurationNanosecondVector = PrimitiveVector; +pub type DurationNanosecondVectorBuilder = PrimitiveVectorBuilder; diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 4adb0df34df5..34c8b016aaae 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_time::interval::IntervalUnit; use crate::data_type::DataType; -use crate::types::{TimeType, TimestampType}; +use crate::types::{DurationType, TimeType, TimestampType}; use crate::vectors::constant::ConstantVector; use crate::vectors::{ BinaryVector, BooleanVector, DateTimeVector, DateVector, IntervalDayTimeVector, @@ -136,6 +136,20 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { is_vector_eq!(TimeNanosecondVector, lhs, rhs) } }, + Duration(d) => match d { + DurationType::Second(_) => { + is_vector_eq!(TimeSecondVector, lhs, rhs) + } + DurationType::Millisecond(_) => { + is_vector_eq!(TimeMillisecondVector, lhs, rhs) + } + DurationType::Microsecond(_) => { + is_vector_eq!(TimeMicrosecondVector, lhs, rhs) + } + DurationType::Nanosecond(_) => { + is_vector_eq!(TimeNanosecondVector, lhs, rhs) + } + }, } } diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs new file mode 100644 index 000000000000..12b16c06b035 --- /dev/null +++ b/src/mito2/src/proto_util.rs @@ -0,0 +1,241 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to process protobuf messages. + +use common_time::timestamp::TimeUnit; +use datatypes::prelude::ConcreteDataType; +use datatypes::types::{TimeType, TimestampType}; +use datatypes::value::Value; +use greptime_proto::v1::{self, ColumnDataType}; +use store_api::storage::OpType; + +use crate::metadata::SemanticType; + +/// Returns true if the pb semantic type is valid. +pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool { + type_value == semantic_type as i32 +} + +/// Returns true if the pb type value is valid. +pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { + let Some(column_type) = ColumnDataType::from_i32(type_value) else { + return false; + }; + + is_column_type_eq(column_type, expect_type) +} + +/// Convert value into proto's value. +pub(crate) fn to_proto_value(value: Value) -> Option { + let proto_value = match value { + Value::Null => v1::Value { value: None }, + Value::Boolean(v) => v1::Value { + value: Some(v1::value::Value::BoolValue(v)), + }, + Value::UInt8(v) => v1::Value { + value: Some(v1::value::Value::U8Value(v.into())), + }, + Value::UInt16(v) => v1::Value { + value: Some(v1::value::Value::U16Value(v.into())), + }, + Value::UInt32(v) => v1::Value { + value: Some(v1::value::Value::U32Value(v)), + }, + Value::UInt64(v) => v1::Value { + value: Some(v1::value::Value::U64Value(v)), + }, + Value::Int8(v) => v1::Value { + value: Some(v1::value::Value::I8Value(v.into())), + }, + Value::Int16(v) => v1::Value { + value: Some(v1::value::Value::I16Value(v.into())), + }, + Value::Int32(v) => v1::Value { + value: Some(v1::value::Value::I32Value(v)), + }, + Value::Int64(v) => v1::Value { + value: Some(v1::value::Value::I64Value(v)), + }, + Value::Float32(v) => v1::Value { + value: Some(v1::value::Value::F32Value(*v)), + }, + Value::Float64(v) => v1::Value { + value: Some(v1::value::Value::F64Value(*v)), + }, + Value::String(v) => v1::Value { + value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())), + }, + Value::Binary(v) => v1::Value { + value: Some(v1::value::Value::BinaryValue(v.to_vec())), + }, + Value::Date(v) => v1::Value { + value: Some(v1::value::Value::DateValue(v.val())), + }, + Value::DateTime(v) => v1::Value { + value: Some(v1::value::Value::DatetimeValue(v.val())), + }, + Value::Timestamp(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value: Some(v1::value::Value::TsSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value: Some(v1::value::Value::TsMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value: Some(v1::value::Value::TsNanosecondValue(v.value())), + }, + }, + Value::Time(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value: Some(v1::value::Value::TimeSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value: Some(v1::value::Value::TimeMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value: Some(v1::value::Value::TimeMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value: Some(v1::value::Value::TimeNanosecondValue(v.value())), + }, + }, + Value::Duration(_) | Value::Interval(_) | Value::List(_) => return None, + }; + + Some(proto_value) +} + +/// Returns the [ColumnDataType] of the value. +/// +/// If value is null, returns `None`. +pub(crate) fn proto_value_type(value: &v1::Value) -> Option { + let value_data = value.value.as_ref()?; + let value_type = match value_data { + v1::value::Value::I8Value(_) => ColumnDataType::Int8, + v1::value::Value::I16Value(_) => ColumnDataType::Int16, + v1::value::Value::I32Value(_) => ColumnDataType::Int32, + v1::value::Value::I64Value(_) => ColumnDataType::Int64, + v1::value::Value::U8Value(_) => ColumnDataType::Uint8, + v1::value::Value::U16Value(_) => ColumnDataType::Uint16, + v1::value::Value::U32Value(_) => ColumnDataType::Uint32, + v1::value::Value::U64Value(_) => ColumnDataType::Uint64, + v1::value::Value::F32Value(_) => ColumnDataType::Float32, + v1::value::Value::F64Value(_) => ColumnDataType::Float64, + v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, + v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, + v1::value::Value::StringValue(_) => ColumnDataType::String, + v1::value::Value::DateValue(_) => ColumnDataType::Date, + v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, + v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, + v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, + v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + }; + Some(value_type) +} + +// TODO(yingwen): Support conversion in greptime-proto. +/// Creates value for i64. +#[cfg(test)] +pub(crate) fn i64_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::I64Value(data)), + } +} + +/// Creates value for timestamp millis. +#[cfg(test)] +pub(crate) fn ts_ms_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(data)), + } +} + +/// Convert [ConcreteDataType] to [ColumnDataType]. +pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { + let column_data_type = match data_type { + ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, + ConcreteDataType::Int8(_) => ColumnDataType::Int8, + ConcreteDataType::Int16(_) => ColumnDataType::Int16, + ConcreteDataType::Int32(_) => ColumnDataType::Int32, + ConcreteDataType::Int64(_) => ColumnDataType::Int64, + ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, + ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, + ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, + ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, + ConcreteDataType::Float32(_) => ColumnDataType::Float32, + ConcreteDataType::Float64(_) => ColumnDataType::Float64, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::String(_) => ColumnDataType::String, + ConcreteDataType::Date(_) => ColumnDataType::Date, + ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, + ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + ColumnDataType::TimestampMillisecond + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + ColumnDataType::TimestampMicrosecond + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + ColumnDataType::TimestampNanosecond + } + ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, + ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, + ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, + ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, + ConcreteDataType::Null(_) + | ConcreteDataType::Duration(_) + | ConcreteDataType::Interval(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => return None, + }; + + Some(column_data_type) +} + +/// Convert semantic type to proto's semantic type +pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType { + match semantic_type { + SemanticType::Tag => v1::SemanticType::Tag, + SemanticType::Field => v1::SemanticType::Field, + SemanticType::Timestamp => v1::SemanticType::Timestamp, + } +} + +/// Convert op type to proto's op type. +pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType { + match op_type { + OpType::Delete => v1::mito::OpType::Delete, + OpType::Put => v1::mito::OpType::Put, + } +} + +/// Returns true if the column type is equal to expected type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + if let Some(expect) = to_column_data_type(expect_type) { + column_type == expect + } else { + false + } +} + +// TODO(yingwen): Tests. diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 973d3877ab43..bb3ebfcd1d08 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -186,7 +186,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Timestamp(v) => row_writer .write_col(v.to_timezone_aware_string(query_context.time_zone()))?, Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?, - Value::List(_) => { + Value::List(_) | Value::Duration(_) => { return Err(Error::Internal { err_msg: format!( "cannot write value {:?} in mysql protocol: unimplemented", diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 26a2a5bcdfd7..f33b42ebd618 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -103,12 +103,14 @@ pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWir } } Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)), - Value::List(_) => Err(PgWireError::ApiError(Box::new(Error::Internal { - err_msg: format!( - "cannot write value {:?} in postgres protocol: unimplemented", - &value - ), - }))), + Value::List(_) | Value::Duration(_) => { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!( + "cannot write value {:?} in postgres protocol: unimplemented", + &value + ), + }))) + } } } @@ -129,7 +131,9 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), &ConcreteDataType::Time(_) => Ok(Type::TIME), &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL), - &ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu { + &ConcreteDataType::Duration(_) + | &ConcreteDataType::List(_) + | &ConcreteDataType::Dictionary(_) => error::InternalSnafu { err_msg: format!("not implemented for column datatype {origin:?}"), } .fail(), diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 9573a69e54e6..dfe6b2dc9af3 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -419,11 +419,14 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu Some(time_type.precision()), TimezoneInfo::None, )), + ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval), ConcreteDataType::Binary(_) => Ok(SqlDataType::Varbinary(None)), - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + ConcreteDataType::Duration(_) + | ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => { unreachable!() } - ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval), } } From a7fe97022ed88a0264f3d4f5bef1304d7cc48359 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 14 Aug 2023 20:50:25 +0800 Subject: [PATCH 2/9] feat: duration and grpc. --- src/api/src/helper.rs | 40 ++++++++++++++++++++++++++++--- src/common/grpc/src/select.rs | 8 +++---- src/datatypes/src/value.rs | 5 ++++ src/frontend/src/instance/grpc.rs | 1 + src/mito2/src/row_converter.rs | 23 +++++++++++++++--- src/servers/src/mysql/writer.rs | 4 +++- 6 files changed, 70 insertions(+), 11 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 238894cd6746..96266dbcebb4 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -22,7 +22,7 @@ use common_time::{Date, DateTime, Interval, Timestamp}; use datatypes::prelude::{ConcreteDataType, ValueRef}; use datatypes::scalars::ScalarVector; use datatypes::types::{ - Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, + Int16Type, Int8Type, DurationType, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, }; use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::{ @@ -100,6 +100,14 @@ impl From for ConcreteDataType { ColumnDataType::IntervalMonthDayNano => { ConcreteDataType::interval_month_day_nano_datatype() } + ColumnDataType::DurationSecond => ConcreteDataType::duration_second_datatype(), + ColumnDataType::DurationMillisecond => { + ConcreteDataType::duration_millisecond_datatype() + } + ColumnDataType::DurationMicrosecond => { + ConcreteDataType::duration_microsecond_datatype() + } + ColumnDataType::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), } } } @@ -146,7 +154,12 @@ impl TryFrom for ColumnDataTypeWrapper { | ConcreteDataType::Dictionary(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } - ConcreteDataType::Duration(_) => todo!("duration is not supported yet"), + ConcreteDataType::Duration(d) => match d { + DurationType::Second(_) => ColumnDataType::DurationSecond, + DurationType::Millisecond(_) => ColumnDataType::DurationMillisecond, + DurationType::Microsecond(_) => ColumnDataType::DurationMicrosecond, + DurationType::Nanosecond(_) => ColumnDataType::DurationNanosecond, + }, }); Ok(datatype) } @@ -258,6 +271,22 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values interval_month_day_nano_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::DurationSecond => Values { + dur_second_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::DurationMillisecond => Values { + dur_millisecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::DurationMicrosecond => Values { + dur_microsecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::DurationNanosecond => Values { + dur_nanosecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } @@ -305,7 +334,12 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { .interval_month_day_nano_values .push(convert_i128_to_interval(val.to_i128())), }, - Value::Duration(_) => todo!("duration is not supported yet"), + Value::Duration(val) => match val.unit() { + TimeUnit::Second => values.dur_second_values.push(val.value()), + TimeUnit::Millisecond => values.dur_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.dur_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.dur_nanosecond_values.push(val.value()), + }, Value::List(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 631cb02d4d84..9716b2348366 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -217,25 +217,25 @@ pub fn values(arrays: &[VectorRef]) -> Result { ( ConcreteDataType::Duration(DurationType::Second(_)), DurationSecondVector, - ts_second_values, + dur_second_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Millisecond(_)), DurationMillisecondVector, - ts_millisecond_values, + dur_millisecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Microsecond(_)), DurationMicrosecondVector, - ts_microsecond_values, + dur_microsecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Nanosecond(_)), DurationNanosecondVector, - ts_nanosecond_values, + dur_nanosecond_values, |x| { x.into_native() } ) ) diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index b14acfa9115c..6b0df9778543 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -812,6 +812,7 @@ impl From> for Value { ValueRef::Timestamp(v) => Value::Timestamp(v), ValueRef::Time(v) => Value::Time(v), ValueRef::Interval(v) => Value::Interval(v), + ValueRef::Duration(v) => Value::Duration(v), ValueRef::List(v) => v.to_value(), } } @@ -963,6 +964,10 @@ impl<'a> ValueRef<'a> { impl_as_for_value_ref!(self, Time) } + pub fn as_duration(&self) -> Result> { + impl_as_for_value_ref!(self, Duration) + } + /// Cast itself to [Interval]. pub fn as_interval(&self) -> Result> { impl_as_for_value_ref!(self, Interval) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 74947581dc2d..0f1cab8b9107 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -49,6 +49,7 @@ impl GrpcQueryHandler for Instance { .context(PermissionSnafu)?; let output = match request { + Request::RowInserts(_) | Request::RowDelete(_) => unimplemented!(), Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 3b0c37ec2d78..f1583c7ba570 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -15,7 +15,7 @@ use bytes::Buf; use common_base::bytes::Bytes; use common_time::time::Time; -use common_time::{Date, Interval}; +use common_time::{Date, Duration, Interval}; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::Value; use datatypes::value::ValueRef; @@ -72,6 +72,7 @@ impl SortField { ConcreteDataType::DateTime(_) => 9, ConcreteDataType::Timestamp(_) => 10, ConcreteDataType::Time(_) => 10, + ConcreteDataType::Duration(_) => 10, ConcreteDataType::Interval(_) => 18, ConcreteDataType::Null(_) | ConcreteDataType::List(_) @@ -136,7 +137,8 @@ impl SortField { Date, date, DateTime, datetime, Time, time, - Interval, interval + Interval, interval, + Duration, duration ); Ok(()) @@ -201,7 +203,8 @@ impl SortField { Date, Date, Time, Time, DateTime, DateTime, - Interval, Interval + Interval, Interval, + Duration, Duration ) } } @@ -311,6 +314,20 @@ mod tests { ); } + #[test] + fn test_memcmp_duration() { + check_encode_and_decode( + &[ + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::int64_datatype(), + ], + &[vec![ + Value::Duration(Duration::new_millisecond(44)), + Value::Int64(45), + ]], + ) + } + #[test] fn test_memcmp_binary() { check_encode_and_decode( diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index bb3ebfcd1d08..1a82add67580 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -186,7 +186,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Timestamp(v) => row_writer .write_col(v.to_timezone_aware_string(query_context.time_zone()))?, Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?, - Value::List(_) | Value::Duration(_) => { + Value::Duration(v) => row_writer.write_col(std::time::Duration::from(v))?, + Value::List(_) => { return Err(Error::Internal { err_msg: format!( "cannot write value {:?} in mysql protocol: unimplemented", @@ -241,6 +242,7 @@ pub(crate) fn create_mysql_column( ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE), ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR), + ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME), _ => error::InternalSnafu { err_msg: format!("not implemented for column datatype {:?}", data_type), } From f758f558379c8a341f85dfebd3a6c4a6eef0d633 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 15 Aug 2023 14:42:59 +0800 Subject: [PATCH 3/9] test: add unit test cases. --- src/api/src/helper.rs | 64 ++++++++++++- src/common/grpc-expr/src/insert.rs | 79 +++++++++++++++- src/common/grpc/src/select.rs | 10 ++ src/common/time/src/duration.rs | 4 + src/datatypes/src/data_type.rs | 37 +++++++- src/datatypes/src/types/duration_type.rs | 33 +++++++ src/datatypes/src/value.rs | 113 ++++++++++++++++++++++- src/datatypes/src/vectors/eq.rs | 33 ++++--- src/servers/src/mysql/writer.rs | 2 +- 9 files changed, 354 insertions(+), 21 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 96266dbcebb4..de19820da1f7 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -921,10 +921,11 @@ mod tests { UInt32Type, }; use datatypes::vectors::{ - BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, - TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, - TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, - TimestampSecondVector, Vector, + BooleanVector, DurationMicrosecondVector, DurationMillisecondVector, + DurationNanosecondVector, DurationSecondVector, IntervalDayTimeVector, + IntervalMonthDayNanoVector, IntervalYearMonthVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, + TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector, }; use paste::paste; @@ -999,6 +1000,10 @@ mod tests { let values = values_with_capacity(ColumnDataType::IntervalMonthDayNano, 2); let values = values.interval_month_day_nano_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::DurationMillisecond, 2); + let values = values.dur_millisecond_values; + assert_eq!(2, values.capacity()); } #[test] @@ -1083,6 +1088,10 @@ mod tests { ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano), ColumnDataTypeWrapper(ColumnDataType::IntervalMonthDayNano).into() ); + assert_eq!( + ConcreteDataType::duration_millisecond_datatype(), + ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond).into() + ) } #[test] @@ -1171,6 +1180,12 @@ mod tests { .try_into() .unwrap() ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::DurationMillisecond), + ConcreteDataType::duration_millisecond_datatype() + .try_into() + .unwrap() + ); let result: Result = ConcreteDataType::null_datatype().try_into(); assert!(result.is_err()); @@ -1314,6 +1329,47 @@ mod tests { }); } + #[test] + fn test_column_put_duration_values() { + let mut column = Column { + column_name: "test".to_string(), + semantic_type: 0, + values: Some(Values { + ..Default::default() + }), + null_mask: vec![], + datatype: 0, + }; + + let vector = Arc::new(DurationNanosecondVector::from_vec(vec![1, 2, 3])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![1, 2, 3], + column.values.as_ref().unwrap().dur_nanosecond_values + ); + + let vector = Arc::new(DurationMicrosecondVector::from_vec(vec![7, 8, 9])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![7, 8, 9], + column.values.as_ref().unwrap().dur_microsecond_values + ); + + let vector = Arc::new(DurationMillisecondVector::from_vec(vec![4, 5, 6])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![4, 5, 6], + column.values.as_ref().unwrap().dur_millisecond_values + ); + + let vector = Arc::new(DurationSecondVector::from_vec(vec![10, 11, 12])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![10, 11, 12], + column.values.as_ref().unwrap().dur_second_values + ); + } + #[test] fn test_column_put_vector() { use crate::v1::SemanticType; diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index b2a0f4452528..fd38020768ea 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -159,8 +159,8 @@ mod tests { ); let column_defs = create_expr.column_defs; - assert_eq!(column_defs[5].name, create_expr.time_index); - assert_eq!(6, column_defs.len()); + assert_eq!(column_defs[6].name, create_expr.time_index); + assert_eq!(7, column_defs.len()); assert_eq!( ConcreteDataType::string_datatype(), @@ -232,6 +232,20 @@ mod tests { ) ); + assert_eq!( + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "duration") + .unwrap() + .datatype + ) + .unwrap() + ) + ); + assert_eq!( ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::from( @@ -265,7 +279,7 @@ mod tests { let add_columns = find_new_columns(&schema, &insert_batch.0).unwrap().unwrap(); - assert_eq!(4, add_columns.add_columns.len()); + assert_eq!(5, add_columns.add_columns.len()); let host_column = &add_columns.add_columns[0]; assert_eq!( ConcreteDataType::string_datatype(), @@ -305,6 +319,52 @@ mod tests { .unwrap() ) ); + + let duration_column = &add_columns.add_columns[4]; + assert!(!duration_column.is_key); + + assert_eq!( + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + duration_column.column_def.as_ref().unwrap().datatype + ) + .unwrap() + ) + ); + } + + #[test] + fn test_convert_duration_values() { + // second + let actual = convert_values( + &ConcreteDataType::Duration(DurationType::Second(DurationSecondType)), + Values { + dur_second_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Duration(Duration::new_second(1_i64)), + Value::Duration(Duration::new_second(2_i64)), + Value::Duration(Duration::new_second(3_i64)), + ]; + assert_eq!(expect, actual); + + // millisecond + let actual = convert_values( + &ConcreteDataType::Duration(DurationType::Millisecond(DurationMillisecondType)), + Values { + dur_millisecond_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Duration(Duration::new_millisecond(1_i64)), + Value::Duration(Duration::new_millisecond(2_i64)), + Value::Duration(Duration::new_millisecond(3_i64)), + ]; + assert_eq!(expect, actual); } #[test] @@ -394,6 +454,18 @@ mod tests { datatype: ColumnDataType::IntervalMonthDayNano as i32, }; + let duration_vals = Values { + dur_millisecond_values: vec![100, 101], + ..Default::default() + }; + let duration_column = Column { + column_name: "duration".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(duration_vals), + null_mask: vec![0], + datatype: ColumnDataType::DurationMillisecond as i32, + }; + let ts_vals = Values { timestamp_millisecond_values: vec![100, 101], ..Default::default() @@ -413,6 +485,7 @@ mod tests { mem_column, time_column, interval_column, + duration_column, ts_column, ], row_count, diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 9716b2348366..764ae6a3ad5c 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -304,6 +304,16 @@ mod tests { }) } + #[test] + fn test_convert_arrow_array_duration_second() { + let array = DurationSecondVector::from(vec![Some(1), Some(2), None, Some(3)]); + let array: VectorRef = Arc::new(array); + + let values = values(&[array]).unwrap(); + + assert_eq!(vec![1, 2, 3], values.dur_second_values); + } + #[test] fn test_convert_arrow_arrays_string() { let array = StringVector::from(vec![ diff --git a/src/common/time/src/duration.rs b/src/common/time/src/duration.rs index 5689423e1276..b109bf05d672 100644 --- a/src/common/time/src/duration.rs +++ b/src/common/time/src/duration.rs @@ -87,6 +87,10 @@ impl Duration { let nsec = u32::try_from(sec_mod * nsec_mul).unwrap(); (sec_div, nsec) } + + pub fn to_std_duration(self) -> std::time::Duration { + self.into() + } } /// Convert i64 to Duration Type. diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 99d783d297f0..3906312ad2d2 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -129,6 +129,7 @@ impl ConcreteDataType { | ConcreteDataType::Timestamp(_) | ConcreteDataType::Time(_) | ConcreteDataType::Interval(_) + | ConcreteDataType::Duration(_) ) } @@ -144,6 +145,7 @@ impl ConcreteDataType { | ConcreteDataType::Timestamp(_) | ConcreteDataType::Time(_) | ConcreteDataType::Interval(_) + | ConcreteDataType::Duration(_) ) } @@ -217,6 +219,15 @@ impl ConcreteDataType { let array = arrow_array::new_empty_array(&self.as_arrow_type()); arrow_array_cast(array.as_ref(), &to_type.as_arrow_type()).is_ok() } + + /// Try to cast data type as a [`DurationType`]. + pub fn as_duration(&self) -> Option { + match self { + ConcreteDataType::Int64(_) => Some(DurationType::Millisecond(DurationMillisecondType)), + ConcreteDataType::Duration(d) => Some(*d), + _ => None, + } + } } impl From<&ConcreteDataType> for ConcreteDataType { @@ -258,6 +269,9 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { } ArrowDataType::Time32(u) => ConcreteDataType::Time(TimeType::from_unit(u.into())), ArrowDataType::Time64(u) => ConcreteDataType::Time(TimeType::from_unit(u.into())), + ArrowDataType::Duration(u) => { + ConcreteDataType::Duration(DurationType::from_unit(u.into())) + } _ => { return error::UnsupportedArrowTypeSnafu { arrow_type: dt.clone(), @@ -588,6 +602,10 @@ mod tests { assert!(!ConcreteDataType::time_millisecond_datatype().is_timestamp_compatible()); assert!(!ConcreteDataType::time_microsecond_datatype().is_timestamp_compatible()); assert!(!ConcreteDataType::time_nanosecond_datatype().is_timestamp_compatible()); + assert!(!ConcreteDataType::duration_second_datatype().is_timestamp_compatible()); + assert!(!ConcreteDataType::duration_millisecond_datatype().is_timestamp_compatible()); + assert!(!ConcreteDataType::duration_microsecond_datatype().is_timestamp_compatible()); + assert!(!ConcreteDataType::duration_nanosecond_datatype().is_timestamp_compatible()); } #[test] @@ -629,6 +647,11 @@ mod tests { assert!(ConcreteDataType::interval_year_month_datatype().is_stringifiable()); assert!(ConcreteDataType::interval_day_time_datatype().is_stringifiable()); assert!(ConcreteDataType::interval_month_day_nano_datatype().is_stringifiable()); + + assert!(ConcreteDataType::duration_second_datatype().is_stringifiable()); + assert!(ConcreteDataType::duration_millisecond_datatype().is_stringifiable()); + assert!(ConcreteDataType::duration_microsecond_datatype().is_stringifiable()); + assert!(ConcreteDataType::duration_nanosecond_datatype().is_stringifiable()); } #[test] @@ -650,6 +673,10 @@ mod tests { assert!(ConcreteDataType::interval_year_month_datatype().is_signed()); assert!(ConcreteDataType::interval_day_time_datatype().is_signed()); assert!(ConcreteDataType::interval_month_day_nano_datatype().is_signed()); + assert!(ConcreteDataType::duration_second_datatype().is_signed()); + assert!(ConcreteDataType::duration_millisecond_datatype().is_signed()); + assert!(ConcreteDataType::duration_microsecond_datatype().is_signed()); + assert!(ConcreteDataType::duration_nanosecond_datatype().is_signed()); assert!(!ConcreteDataType::uint8_datatype().is_signed()); assert!(!ConcreteDataType::uint16_datatype().is_signed()); @@ -679,6 +706,10 @@ mod tests { assert!(!ConcreteDataType::interval_year_month_datatype().is_unsigned()); assert!(!ConcreteDataType::interval_day_time_datatype().is_unsigned()); assert!(!ConcreteDataType::interval_month_day_nano_datatype().is_unsigned()); + assert!(!ConcreteDataType::duration_second_datatype().is_unsigned()); + assert!(!ConcreteDataType::duration_millisecond_datatype().is_unsigned()); + assert!(!ConcreteDataType::duration_microsecond_datatype().is_unsigned()); + assert!(!ConcreteDataType::duration_nanosecond_datatype().is_unsigned()); assert!(ConcreteDataType::uint8_datatype().is_unsigned()); assert!(ConcreteDataType::uint16_datatype().is_unsigned()); @@ -787,6 +818,10 @@ mod tests { )) .to_string(), "Interval" - ) + ); + assert_eq!( + ConcreteDataType::duration_second_datatype().to_string(), + "Duration" + ); } } diff --git a/src/datatypes/src/types/duration_type.rs b/src/datatypes/src/types/duration_type.rs index e5255357288d..daa7bd768d28 100644 --- a/src/datatypes/src/types/duration_type.rs +++ b/src/datatypes/src/types/duration_type.rs @@ -50,6 +50,16 @@ pub enum DurationType { } impl DurationType { + /// Creates time type from `TimeUnit`. + pub fn from_unit(unit: TimeUnit) -> Self { + match unit { + TimeUnit::Second => Self::Second(DurationSecondType), + TimeUnit::Millisecond => Self::Millisecond(DurationMillisecondType), + TimeUnit::Microsecond => Self::Microsecond(DurationMicrosecondType), + TimeUnit::Nanosecond => Self::Nanosecond(DurationNanosecondType), + } + } + /// Returns the [`TimeUnit`] of this type. pub fn unit(&self) -> TimeUnit { match self { @@ -179,4 +189,27 @@ mod tests { DurationType::Nanosecond(DurationNanosecondType).unit() ); } + + #[test] + fn test_from_unit() { + assert_eq!( + DurationType::Second(DurationSecondType), + DurationType::from_unit(TimeUnit::Second) + ); + + assert_eq!( + DurationType::Millisecond(DurationMillisecondType), + DurationType::from_unit(TimeUnit::Millisecond) + ); + + assert_eq!( + DurationType::Microsecond(DurationMicrosecondType), + DurationType::from_unit(TimeUnit::Microsecond) + ); + + assert_eq!( + DurationType::Nanosecond(DurationNanosecondType), + DurationType::from_unit(TimeUnit::Nanosecond) + ); + } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 6b0df9778543..c56832cb1d5d 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -380,7 +380,7 @@ pub fn duration_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue TimeUnit::Second => ScalarValue::DurationSecond(val), TimeUnit::Millisecond => ScalarValue::DurationMillisecond(val), TimeUnit::Microsecond => ScalarValue::DurationMicrosecond(val), - TimeUnit::Nanosecond => ScalarValue::DurationMicrosecond(val), + TimeUnit::Nanosecond => ScalarValue::DurationNanosecond(val), } } @@ -1338,6 +1338,46 @@ mod tests { ScalarValue::Time64Nanosecond(None).try_into().unwrap() ); + assert_eq!( + Value::Duration(Duration::new_second(1)), + ScalarValue::DurationSecond(Some(1)).try_into().unwrap() + ); + assert_eq!( + Value::Null, + ScalarValue::DurationSecond(None).try_into().unwrap() + ); + + assert_eq!( + Value::Duration(Duration::new_millisecond(1)), + ScalarValue::DurationMillisecond(Some(1)) + .try_into() + .unwrap() + ); + assert_eq!( + Value::Null, + ScalarValue::DurationMillisecond(None).try_into().unwrap() + ); + + assert_eq!( + Value::Duration(Duration::new_microsecond(1)), + ScalarValue::DurationMicrosecond(Some(1)) + .try_into() + .unwrap() + ); + assert_eq!( + Value::Null, + ScalarValue::DurationMicrosecond(None).try_into().unwrap() + ); + + assert_eq!( + Value::Duration(Duration::new_nanosecond(1)), + ScalarValue::DurationNanosecond(Some(1)).try_into().unwrap() + ); + assert_eq!( + Value::Null, + ScalarValue::DurationNanosecond(None).try_into().unwrap() + ); + let result: Result = ScalarValue::Decimal128(Some(1), 0, 0).try_into(); assert!(result .unwrap_err() @@ -1490,6 +1530,22 @@ mod tests { &ConcreteDataType::interval_month_day_nano_datatype(), &Value::Interval(Interval::from_month_day_nano(1, 2, 3)), ); + check_type_and_value( + &ConcreteDataType::duration_second_datatype(), + &Value::Duration(Duration::new_second(1)), + ); + check_type_and_value( + &ConcreteDataType::duration_millisecond_datatype(), + &Value::Duration(Duration::new_millisecond(1)), + ); + check_type_and_value( + &ConcreteDataType::duration_microsecond_datatype(), + &Value::Duration(Duration::new_microsecond(1)), + ); + check_type_and_value( + &ConcreteDataType::duration_nanosecond_datatype(), + &Value::Duration(Duration::new_nanosecond(1)), + ); } #[test] @@ -1589,6 +1645,10 @@ mod tests { serde_json::Value::Number(1.into()), to_json(Value::Time(Time::new_millisecond(1))) ); + assert_eq!( + serde_json::Value::Number(1.into()), + to_json(Value::Duration(Duration::new_millisecond(1))) + ); let json_value: serde_json::Value = serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap(); @@ -1648,6 +1708,7 @@ mod tests { check_as_value_ref!(Timestamp, Timestamp::new_millisecond(1)); check_as_value_ref!(Time, Time::new_millisecond(1)); check_as_value_ref!(Interval, Interval::from_month_day_nano(1, 2, 3)); + check_as_value_ref!(Duration, Duration::new_millisecond(1)); assert_eq!( ValueRef::String("hello"), @@ -1698,6 +1759,7 @@ mod tests { check_as_correct!(Date::new(123), Date, as_date); check_as_correct!(DateTime::new(12), DateTime, as_datetime); check_as_correct!(Time::new_second(12), Time, as_time); + check_as_correct!(Duration::new_second(12), Duration, as_duration); let list = ListValue { items: None, datatype: ConcreteDataType::int32_datatype(), @@ -1749,6 +1811,10 @@ mod tests { Value::Time(Time::new(1000, TimeUnit::Millisecond)).to_string(), "08:00:01+0800" ); + assert_eq!( + Value::Duration(Duration::new_millisecond(1000)).to_string(), + "1000ms" + ); assert_eq!( Value::List(ListValue::new( Some(Box::new(vec![Value::Int8(1), Value::Int8(2)])), @@ -1984,6 +2050,31 @@ mod tests { .try_to_scalar_value(&ConcreteDataType::time_nanosecond_datatype()) .unwrap() ); + + assert_eq!( + ScalarValue::DurationSecond(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::duration_second_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::DurationMillisecond(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::duration_millisecond_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::DurationMicrosecond(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::duration_microsecond_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::DurationNanosecond(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::duration_nanosecond_datatype()) + .unwrap() + ); } #[test] @@ -2049,4 +2140,24 @@ mod tests { time_to_scalar_value(TimeUnit::Nanosecond, Some(1)).unwrap() ); } + + #[test] + fn test_duration_to_scalar_value() { + assert_eq!( + ScalarValue::DurationSecond(Some(1)), + duration_to_scalar_value(TimeUnit::Second, Some(1)) + ); + assert_eq!( + ScalarValue::DurationMillisecond(Some(1)), + duration_to_scalar_value(TimeUnit::Millisecond, Some(1)) + ); + assert_eq!( + ScalarValue::DurationMicrosecond(Some(1)), + duration_to_scalar_value(TimeUnit::Microsecond, Some(1)) + ); + assert_eq!( + ScalarValue::DurationNanosecond(Some(1)), + duration_to_scalar_value(TimeUnit::Nanosecond, Some(1)) + ); + } } diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 34c8b016aaae..f9c02f76a4f5 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -20,11 +20,12 @@ use crate::data_type::DataType; use crate::types::{DurationType, TimeType, TimestampType}; use crate::vectors::constant::ConstantVector; use crate::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, IntervalDayTimeVector, - IntervalMonthDayNanoVector, IntervalYearMonthVector, ListVector, PrimitiveVector, StringVector, - TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, - TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, - TimestampSecondVector, Vector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, + DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, + IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, ListVector, + PrimitiveVector, StringVector, TimeMicrosecondVector, TimeMillisecondVector, + TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, Vector, }; use crate::with_match_primitive_type_id; @@ -138,16 +139,16 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { }, Duration(d) => match d { DurationType::Second(_) => { - is_vector_eq!(TimeSecondVector, lhs, rhs) + is_vector_eq!(DurationSecondVector, lhs, rhs) } DurationType::Millisecond(_) => { - is_vector_eq!(TimeMillisecondVector, lhs, rhs) + is_vector_eq!(DurationMillisecondVector, lhs, rhs) } DurationType::Microsecond(_) => { - is_vector_eq!(TimeMicrosecondVector, lhs, rhs) + is_vector_eq!(DurationMicrosecondVector, lhs, rhs) } DurationType::Nanosecond(_) => { - is_vector_eq!(TimeNanosecondVector, lhs, rhs) + is_vector_eq!(DurationNanosecondVector, lhs, rhs) } }, } @@ -157,8 +158,9 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { mod tests { use super::*; use crate::vectors::{ - list, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, - NullVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, + list, DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, + Int8Vector, NullVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, }; fn assert_vector_ref_eq(vector: VectorRef) { @@ -236,6 +238,10 @@ mod tests { assert_vector_ref_eq(Arc::new(IntervalMonthDayNanoVector::from_values([ 1000, 2000, 3000, 4000, ]))); + assert_vector_ref_eq(Arc::new(DurationSecondVector::from_values([300, 310]))); + assert_vector_ref_eq(Arc::new(DurationMillisecondVector::from_values([300, 310]))); + assert_vector_ref_eq(Arc::new(DurationMicrosecondVector::from_values([300, 310]))); + assert_vector_ref_eq(Arc::new(DurationNanosecondVector::from_values([300, 310]))); } #[test] @@ -301,5 +307,10 @@ mod tests { Arc::new(IntervalYearMonthVector::from_values([1000, 2000])), Arc::new(IntervalYearMonthVector::from_values([2100, 1200])), ); + + assert_vector_ref_ne( + Arc::new(DurationSecondVector::from_values([300, 310])), + Arc::new(DurationSecondVector::from_values([300, 320])), + ); } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 1a82add67580..d5cf69703f6d 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -186,7 +186,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Timestamp(v) => row_writer .write_col(v.to_timezone_aware_string(query_context.time_zone()))?, Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?, - Value::Duration(v) => row_writer.write_col(std::time::Duration::from(v))?, + Value::Duration(v) => row_writer.write_col(v.to_std_duration())?, Value::List(_) => { return Err(Error::Internal { err_msg: format!( From b3b13352d57eb85941a5f7f664e0d523ed63f169 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 15 Aug 2023 16:54:23 +0800 Subject: [PATCH 4/9] chore: style and test case. --- src/api/src/helper.rs | 11 +- src/common/time/src/duration.rs | 1 + src/datatypes/src/duration.rs | 19 ++ src/datatypes/src/vectors/primitive.rs | 85 ++++++++- src/frontend/src/instance/grpc.rs | 1 - src/mito2/src/proto_util.rs | 241 ------------------------- 6 files changed, 106 insertions(+), 252 deletions(-) delete mode 100644 src/mito2/src/proto_util.rs diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index de19820da1f7..5a1ad874d845 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -149,17 +149,17 @@ impl TryFrom for ColumnDataTypeWrapper { IntervalType::DayTime(_) => ColumnDataType::IntervalDayTime, IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano, }, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => { - return error::IntoColumnDataTypeSnafu { from: datatype }.fail() - } ConcreteDataType::Duration(d) => match d { DurationType::Second(_) => ColumnDataType::DurationSecond, DurationType::Millisecond(_) => ColumnDataType::DurationMillisecond, DurationType::Microsecond(_) => ColumnDataType::DurationMicrosecond, DurationType::Nanosecond(_) => ColumnDataType::DurationNanosecond, }, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => { + return error::IntoColumnDataTypeSnafu { from: datatype }.fail() + } }); Ok(datatype) } @@ -837,6 +837,7 @@ pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, ConcreteDataType::Null(_) + | ConcreteDataType::Duration(_) | ConcreteDataType::Interval(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => return None, diff --git a/src/common/time/src/duration.rs b/src/common/time/src/duration.rs index b109bf05d672..06742ba294b5 100644 --- a/src/common/time/src/duration.rs +++ b/src/common/time/src/duration.rs @@ -88,6 +88,7 @@ impl Duration { (sec_div, nsec) } + /// Convert to std::time::Duration. pub fn to_std_duration(self) -> std::time::Duration { self.into() } diff --git a/src/datatypes/src/duration.rs b/src/datatypes/src/duration.rs index d6f79a7fe137..af1a39da1fe0 100644 --- a/src/datatypes/src/duration.rs +++ b/src/datatypes/src/duration.rs @@ -139,4 +139,23 @@ mod tests { assert_eq!(d, d.as_scalar_ref()); assert_eq!(d, d.to_owned_scalar()); } + + #[test] + fn test_duration_to_native_type() { + let duration = DurationSecond::new(456); + let native: i64 = duration.into_native(); + assert_eq!(native, 456); + + let duration = DurationMillisecond::new(456); + let native: i64 = duration.into_native(); + assert_eq!(native, 456); + + let duration = DurationMicrosecond::new(456); + let native: i64 = duration.into_native(); + assert_eq!(native, 456); + + let duration = DurationNanosecond::new(456); + let native: i64 = duration.into_native(); + assert_eq!(native, 456); + } } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 670211948b7c..027f09fbfeb6 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -23,7 +23,10 @@ use arrow::array::{ TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; -use arrow_array::{IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray}; +use arrow_array::{ + DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, + DurationSecondArray, IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray, +}; use arrow_schema::DataType; use serde_json::Value as JsonValue; use snafu::OptionExt; @@ -198,6 +201,45 @@ impl PrimitiveVector { Ok(Self::new(concrete_array)) } + pub fn try_from_arrow_duration_array(array: impl AsRef) -> Result { + let array = array.as_ref(); + let array_data = match array.data_type() { + DataType::Duration(unit) => match unit { + arrow_schema::TimeUnit::Second => array + .as_any() + .downcast_ref::() + .unwrap() + .to_data(), + arrow_schema::TimeUnit::Millisecond => array + .as_any() + .downcast_ref::() + .unwrap() + .to_data(), + arrow_schema::TimeUnit::Microsecond => array + .as_any() + .downcast_ref::() + .unwrap() + .to_data(), + arrow_schema::TimeUnit::Nanosecond => array + .as_any() + .downcast_ref::() + .unwrap() + .to_data(), + }, + arrow_type => { + return CastTypeSnafu { + msg: format!( + "Failed to cast arrow array {:?} to interval vector", + arrow_type, + ), + } + .fail()?; + } + }; + let concrete_array = PrimitiveArray::::from(array_data); + Ok(Self::new(concrete_array)) + } + pub fn from_slice>(slice: P) -> Self { let iter = slice.as_ref().iter().copied(); Self { @@ -538,7 +580,7 @@ mod tests { Time64NanosecondArray, }; use arrow::datatypes::DataType as ArrowDataType; - use arrow_array::{IntervalDayTimeArray, IntervalYearMonthArray}; + use arrow_array::{DurationSecondArray, IntervalDayTimeArray, IntervalYearMonthArray}; use serde_json; use super::*; @@ -546,9 +588,11 @@ mod tests { use crate::serialize::Serializable; use crate::types::Int64Type; use crate::vectors::{ - IntervalDayTimeVector, IntervalYearMonthVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector, + TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, + TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, + TimestampSecondVector, }; fn check_vec(v: Int32Vector) { @@ -779,4 +823,35 @@ mod tests { vector ); } + + #[test] + fn test_try_from_arrow_duration_array() { + let array: ArrayRef = Arc::new(DurationSecondArray::from(vec![1000, 2000, 3000])); + let vector = DurationSecondVector::try_from_arrow_duration_array(array).unwrap(); + assert_eq!( + DurationSecondVector::from_values(vec![1000, 2000, 3000]), + vector + ); + + let array: ArrayRef = Arc::new(DurationMillisecondArray::from(vec![1000, 2000, 3000])); + let vector = DurationMillisecondVector::try_from_arrow_duration_array(array).unwrap(); + assert_eq!( + DurationMillisecondVector::from_values(vec![1000, 2000, 3000]), + vector + ); + + let array: ArrayRef = Arc::new(DurationMicrosecondArray::from(vec![1000, 2000, 3000])); + let vector = DurationMicrosecondVector::try_from_arrow_duration_array(array).unwrap(); + assert_eq!( + DurationMicrosecondVector::from_values(vec![1000, 2000, 3000]), + vector + ); + + let array: ArrayRef = Arc::new(DurationNanosecondArray::from(vec![1000, 2000, 3000])); + let vector = DurationNanosecondVector::try_from_arrow_duration_array(array).unwrap(); + assert_eq!( + DurationNanosecondVector::from_values(vec![1000, 2000, 3000]), + vector + ); + } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 0f1cab8b9107..74947581dc2d 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -49,7 +49,6 @@ impl GrpcQueryHandler for Instance { .context(PermissionSnafu)?; let output = match request { - Request::RowInserts(_) | Request::RowDelete(_) => unimplemented!(), Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs deleted file mode 100644 index 12b16c06b035..000000000000 --- a/src/mito2/src/proto_util.rs +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Utilities to process protobuf messages. - -use common_time::timestamp::TimeUnit; -use datatypes::prelude::ConcreteDataType; -use datatypes::types::{TimeType, TimestampType}; -use datatypes::value::Value; -use greptime_proto::v1::{self, ColumnDataType}; -use store_api::storage::OpType; - -use crate::metadata::SemanticType; - -/// Returns true if the pb semantic type is valid. -pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool { - type_value == semantic_type as i32 -} - -/// Returns true if the pb type value is valid. -pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { - let Some(column_type) = ColumnDataType::from_i32(type_value) else { - return false; - }; - - is_column_type_eq(column_type, expect_type) -} - -/// Convert value into proto's value. -pub(crate) fn to_proto_value(value: Value) -> Option { - let proto_value = match value { - Value::Null => v1::Value { value: None }, - Value::Boolean(v) => v1::Value { - value: Some(v1::value::Value::BoolValue(v)), - }, - Value::UInt8(v) => v1::Value { - value: Some(v1::value::Value::U8Value(v.into())), - }, - Value::UInt16(v) => v1::Value { - value: Some(v1::value::Value::U16Value(v.into())), - }, - Value::UInt32(v) => v1::Value { - value: Some(v1::value::Value::U32Value(v)), - }, - Value::UInt64(v) => v1::Value { - value: Some(v1::value::Value::U64Value(v)), - }, - Value::Int8(v) => v1::Value { - value: Some(v1::value::Value::I8Value(v.into())), - }, - Value::Int16(v) => v1::Value { - value: Some(v1::value::Value::I16Value(v.into())), - }, - Value::Int32(v) => v1::Value { - value: Some(v1::value::Value::I32Value(v)), - }, - Value::Int64(v) => v1::Value { - value: Some(v1::value::Value::I64Value(v)), - }, - Value::Float32(v) => v1::Value { - value: Some(v1::value::Value::F32Value(*v)), - }, - Value::Float64(v) => v1::Value { - value: Some(v1::value::Value::F64Value(*v)), - }, - Value::String(v) => v1::Value { - value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())), - }, - Value::Binary(v) => v1::Value { - value: Some(v1::value::Value::BinaryValue(v.to_vec())), - }, - Value::Date(v) => v1::Value { - value: Some(v1::value::Value::DateValue(v.val())), - }, - Value::DateTime(v) => v1::Value { - value: Some(v1::value::Value::DatetimeValue(v.val())), - }, - Value::Timestamp(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value: Some(v1::value::Value::TsSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value: Some(v1::value::Value::TsMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value: Some(v1::value::Value::TsMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value: Some(v1::value::Value::TsNanosecondValue(v.value())), - }, - }, - Value::Time(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value: Some(v1::value::Value::TimeSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value: Some(v1::value::Value::TimeMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value: Some(v1::value::Value::TimeMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value: Some(v1::value::Value::TimeNanosecondValue(v.value())), - }, - }, - Value::Duration(_) | Value::Interval(_) | Value::List(_) => return None, - }; - - Some(proto_value) -} - -/// Returns the [ColumnDataType] of the value. -/// -/// If value is null, returns `None`. -pub(crate) fn proto_value_type(value: &v1::Value) -> Option { - let value_data = value.value.as_ref()?; - let value_type = match value_data { - v1::value::Value::I8Value(_) => ColumnDataType::Int8, - v1::value::Value::I16Value(_) => ColumnDataType::Int16, - v1::value::Value::I32Value(_) => ColumnDataType::Int32, - v1::value::Value::I64Value(_) => ColumnDataType::Int64, - v1::value::Value::U8Value(_) => ColumnDataType::Uint8, - v1::value::Value::U16Value(_) => ColumnDataType::Uint16, - v1::value::Value::U32Value(_) => ColumnDataType::Uint32, - v1::value::Value::U64Value(_) => ColumnDataType::Uint64, - v1::value::Value::F32Value(_) => ColumnDataType::Float32, - v1::value::Value::F64Value(_) => ColumnDataType::Float64, - v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, - v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, - v1::value::Value::StringValue(_) => ColumnDataType::String, - v1::value::Value::DateValue(_) => ColumnDataType::Date, - v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, - v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, - v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, - v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, - v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, - v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, - v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, - v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, - v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, - }; - Some(value_type) -} - -// TODO(yingwen): Support conversion in greptime-proto. -/// Creates value for i64. -#[cfg(test)] -pub(crate) fn i64_value(data: i64) -> v1::Value { - v1::Value { - value: Some(v1::value::Value::I64Value(data)), - } -} - -/// Creates value for timestamp millis. -#[cfg(test)] -pub(crate) fn ts_ms_value(data: i64) -> v1::Value { - v1::Value { - value: Some(v1::value::Value::TsMillisecondValue(data)), - } -} - -/// Convert [ConcreteDataType] to [ColumnDataType]. -pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { - let column_data_type = match data_type { - ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, - ConcreteDataType::Int8(_) => ColumnDataType::Int8, - ConcreteDataType::Int16(_) => ColumnDataType::Int16, - ConcreteDataType::Int32(_) => ColumnDataType::Int32, - ConcreteDataType::Int64(_) => ColumnDataType::Int64, - ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, - ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, - ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, - ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, - ConcreteDataType::Float32(_) => ColumnDataType::Float32, - ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) => ColumnDataType::Binary, - ConcreteDataType::String(_) => ColumnDataType::String, - ConcreteDataType::Date(_) => ColumnDataType::Date, - ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, - ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { - ColumnDataType::TimestampMillisecond - } - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { - ColumnDataType::TimestampMicrosecond - } - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { - ColumnDataType::TimestampNanosecond - } - ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, - ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, - ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, - ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, - ConcreteDataType::Null(_) - | ConcreteDataType::Duration(_) - | ConcreteDataType::Interval(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => return None, - }; - - Some(column_data_type) -} - -/// Convert semantic type to proto's semantic type -pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType { - match semantic_type { - SemanticType::Tag => v1::SemanticType::Tag, - SemanticType::Field => v1::SemanticType::Field, - SemanticType::Timestamp => v1::SemanticType::Timestamp, - } -} - -/// Convert op type to proto's op type. -pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType { - match op_type { - OpType::Delete => v1::mito::OpType::Delete, - OpType::Put => v1::mito::OpType::Put, - } -} - -/// Returns true if the column type is equal to expected type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - if let Some(expect) = to_column_data_type(expect_type) { - column_type == expect - } else { - false - } -} - -// TODO(yingwen): Tests. From f0ffccf339098db715c321cef2649064aa8bf1d4 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 25 Aug 2023 16:08:14 +0800 Subject: [PATCH 5/9] fix: update greptime-proto version and helper.rs --- src/api/src/helper.rs | 100 +++++++++++++++++++++++++++-- src/common/grpc-expr/src/insert.rs | 32 +++++++++ src/mito2/src/row_converter.rs | 4 +- 3 files changed, 129 insertions(+), 7 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 5a1ad874d845..71b2b41b520f 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -18,16 +18,17 @@ use common_base::BitVec; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::TimeUnit; -use common_time::{Date, DateTime, Interval, Timestamp}; +use common_time::{Date, DateTime, Duration, Interval, Timestamp}; use datatypes::prelude::{ConcreteDataType, ValueRef}; use datatypes::scalars::ScalarVector; use datatypes::types::{ - Int16Type, Int8Type, DurationType, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, + DurationType, Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, }; use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, + DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, Float32Vector, + Float64Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, @@ -431,6 +432,10 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { let interval = Interval::from_month_day_nano(v.months, v.days, v.nanoseconds); ValueRef::Interval(interval) } + ValueData::DurSecondValue(v) => ValueRef::Duration(Duration::new_second(*v)), + ValueData::DurMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), + ValueData::DurMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), + ValueData::DurNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), } } @@ -503,6 +508,20 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> )) } }, + ConcreteDataType::Duration(unit) => match unit { + DurationType::Second(_) => { + Arc::new(DurationSecondVector::from_vec(values.dur_second_values)) + } + DurationType::Millisecond(_) => Arc::new(DurationMillisecondVector::from_vec( + values.dur_millisecond_values, + )), + DurationType::Microsecond(_) => Arc::new(DurationMicrosecondVector::from_vec( + values.dur_microsecond_values, + )), + DurationType::Nanosecond(_) => Arc::new(DurationNanosecondVector::from_vec( + values.dur_nanosecond_values, + )), + }, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } @@ -653,6 +672,26 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< )) }) .collect(), + ConcreteDataType::Duration(DurationType::Second(_)) => values + .dur_second_values + .into_iter() + .map(|v| Value::Duration(Duration::new_second(v))) + .collect(), + ConcreteDataType::Duration(DurationType::Millisecond(_)) => values + .dur_millisecond_values + .into_iter() + .map(|v| Value::Duration(Duration::new_millisecond(v))) + .collect(), + ConcreteDataType::Duration(DurationType::Microsecond(_)) => values + .dur_microsecond_values + .into_iter() + .map(|v| Value::Duration(Duration::new_microsecond(v))) + .collect(), + ConcreteDataType::Duration(DurationType::Nanosecond(_)) => values + .dur_nanosecond_values + .into_iter() + .map(|v| Value::Duration(Duration::new_nanosecond(v))) + .collect(), ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } @@ -763,6 +802,20 @@ pub fn to_proto_value(value: Value) -> Option { )), }, }, + Value::Duration(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value_data: Some(ValueData::DurSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value_data: Some(ValueData::DurMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value_data: Some(ValueData::DurMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value_data: Some(ValueData::DurNanosecondValue(v.value())), + }, + }, Value::List(_) => return None, }; @@ -800,6 +853,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth, ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime, ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano, + ValueData::DurSecondValue(_) => ColumnDataType::DurationSecond, + ValueData::DurMillisecondValue(_) => ColumnDataType::DurationMillisecond, + ValueData::DurMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, + ValueData::DurNanosecondValue(_) => ColumnDataType::DurationNanosecond, }; Some(value_type) } @@ -918,7 +975,7 @@ mod tests { use datatypes::types::{ Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, - TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType, + TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType, DurationSecondType, DurationMillisecondType, UInt32Type, }; use datatypes::vectors::{ @@ -1471,6 +1528,39 @@ mod tests { assert_eq!(expect, actual); } + #[test] + fn test_convert_duration_values() { + // second + let actual = pb_values_to_values( + &ConcreteDataType::Duration(DurationType::Second(DurationSecondType)), + Values { + dur_second_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Duration(Duration::new_second(1_i64)), + Value::Duration(Duration::new_second(2_i64)), + Value::Duration(Duration::new_second(3_i64)), + ]; + assert_eq!(expect, actual); + + // millisecond + let actual = pb_values_to_values( + &ConcreteDataType::Duration(DurationType::Millisecond(DurationMillisecondType)), + Values { + dur_millisecond_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Duration(Duration::new_millisecond(1_i64)), + Value::Duration(Duration::new_millisecond(2_i64)), + Value::Duration(Duration::new_millisecond(3_i64)), + ]; + assert_eq!(expect, actual); + } + #[test] fn test_convert_interval_values() { // year_month diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index fd38020768ea..9e8b512ab645 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -367,6 +367,38 @@ mod tests { assert_eq!(expect, actual); } + #[test] + fn test_to_table_insert_request() { + let (columns, row_count) = mock_insert_batch(); + let request = GrpcInsertRequest { + table_name: "demo".to_string(), + columns, + row_count, + region_number: 0, + }; + let insert_req = to_table_insert_request("greptime", "public", request).unwrap(); + + assert_eq!("greptime", insert_req.catalog_name); + assert_eq!("public", insert_req.schema_name); + assert_eq!("demo", insert_req.table_name); + + let host = insert_req.columns_values.get("host").unwrap(); + assert_eq!(Value::String("host1".into()), host.get(0)); + assert_eq!(Value::String("host2".into()), host.get(1)); + + let cpu = insert_req.columns_values.get("cpu").unwrap(); + assert_eq!(Value::Float64(0.31.into()), cpu.get(0)); + assert_eq!(Value::Null, cpu.get(1)); + + let memory = insert_req.columns_values.get("memory").unwrap(); + assert_eq!(Value::Null, memory.get(0)); + assert_eq!(Value::Float64(0.1.into()), memory.get(1)); + + let ts = insert_req.columns_values.get("ts").unwrap(); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0)); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1)); + } + #[test] fn test_is_null() { let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index f1583c7ba570..f6b3119e3f9f 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -321,10 +321,10 @@ mod tests { ConcreteDataType::duration_millisecond_datatype(), ConcreteDataType::int64_datatype(), ], - &[vec![ + vec![ Value::Duration(Duration::new_millisecond(44)), Value::Int64(45), - ]], + ], ) } From 38c27b58384722a346d98e80e4e1f369e8aa6354 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 15 Sep 2023 18:59:09 +0800 Subject: [PATCH 6/9] chore: fix type name. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 86 ++++++++++++++------------ src/common/grpc-expr/src/insert.rs | 72 +-------------------- src/common/grpc/src/select.rs | 10 +-- src/operator/src/req_convert/common.rs | 16 +++++ 6 files changed, 72 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f0638fe574c..d413d9ae49ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4102,7 +4102,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=637c54338f0cd0a333fd67bcb37d79dabb014453#637c54338f0cd0a333fd67bcb37d79dabb014453" +source = "git+https://github.com/Quenkar/greptime-proto.git?rev=9a5dfe9ba412c80827a425d0b38dcd7acd69c561#9a5dfe9ba412c80827a425d0b38dcd7acd69c561" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2f9656f1fe52..c173753c0e74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "637c54338f0cd0a333fd67bcb37d79dabb014453" } +greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "9a5dfe9ba412c80827a425d0b38dcd7acd69c561" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 71b2b41b520f..5d08cb12f619 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -273,19 +273,19 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ..Default::default() }, ColumnDataType::DurationSecond => Values { - dur_second_values: Vec::with_capacity(capacity), + duration_second_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::DurationMillisecond => Values { - dur_millisecond_values: Vec::with_capacity(capacity), + duration_millisecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::DurationMicrosecond => Values { - dur_microsecond_values: Vec::with_capacity(capacity), + duration_microsecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::DurationNanosecond => Values { - dur_nanosecond_values: Vec::with_capacity(capacity), + duration_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, } @@ -336,10 +336,10 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { .push(convert_i128_to_interval(val.to_i128())), }, Value::Duration(val) => match val.unit() { - TimeUnit::Second => values.dur_second_values.push(val.value()), - TimeUnit::Millisecond => values.dur_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.dur_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.dur_nanosecond_values.push(val.value()), + TimeUnit::Second => values.duration_second_values.push(val.value()), + TimeUnit::Millisecond => values.duration_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.duration_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.duration_nanosecond_values.push(val.value()), }, Value::List(_) => unreachable!(), }); @@ -432,10 +432,10 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { let interval = Interval::from_month_day_nano(v.months, v.days, v.nanoseconds); ValueRef::Interval(interval) } - ValueData::DurSecondValue(v) => ValueRef::Duration(Duration::new_second(*v)), - ValueData::DurMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), - ValueData::DurMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), - ValueData::DurNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), + ValueData::DurationSecondValue(v) => ValueRef::Duration(Duration::new_second(*v)), + ValueData::DurationMillisecondValue(v) => ValueRef::Duration(Duration::new_millisecond(*v)), + ValueData::DurationMicrosecondValue(v) => ValueRef::Duration(Duration::new_microsecond(*v)), + ValueData::DurationNanosecondValue(v) => ValueRef::Duration(Duration::new_nanosecond(*v)), } } @@ -509,17 +509,17 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> } }, ConcreteDataType::Duration(unit) => match unit { - DurationType::Second(_) => { - Arc::new(DurationSecondVector::from_vec(values.dur_second_values)) - } + DurationType::Second(_) => Arc::new(DurationSecondVector::from_vec( + values.duration_second_values, + )), DurationType::Millisecond(_) => Arc::new(DurationMillisecondVector::from_vec( - values.dur_millisecond_values, + values.duration_millisecond_values, )), DurationType::Microsecond(_) => Arc::new(DurationMicrosecondVector::from_vec( - values.dur_microsecond_values, + values.duration_microsecond_values, )), DurationType::Nanosecond(_) => Arc::new(DurationNanosecondVector::from_vec( - values.dur_nanosecond_values, + values.duration_nanosecond_values, )), }, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { @@ -673,22 +673,22 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< }) .collect(), ConcreteDataType::Duration(DurationType::Second(_)) => values - .dur_second_values + .duration_second_values .into_iter() .map(|v| Value::Duration(Duration::new_second(v))) .collect(), ConcreteDataType::Duration(DurationType::Millisecond(_)) => values - .dur_millisecond_values + .duration_millisecond_values .into_iter() .map(|v| Value::Duration(Duration::new_millisecond(v))) .collect(), ConcreteDataType::Duration(DurationType::Microsecond(_)) => values - .dur_microsecond_values + .duration_microsecond_values .into_iter() .map(|v| Value::Duration(Duration::new_microsecond(v))) .collect(), ConcreteDataType::Duration(DurationType::Nanosecond(_)) => values - .dur_nanosecond_values + .duration_nanosecond_values .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), @@ -804,16 +804,16 @@ pub fn to_proto_value(value: Value) -> Option { }, Value::Duration(v) => match v.unit() { TimeUnit::Second => v1::Value { - value_data: Some(ValueData::DurSecondValue(v.value())), + value_data: Some(ValueData::DurationSecondValue(v.value())), }, TimeUnit::Millisecond => v1::Value { - value_data: Some(ValueData::DurMillisecondValue(v.value())), + value_data: Some(ValueData::DurationMillisecondValue(v.value())), }, TimeUnit::Microsecond => v1::Value { - value_data: Some(ValueData::DurMicrosecondValue(v.value())), + value_data: Some(ValueData::DurationMicrosecondValue(v.value())), }, TimeUnit::Nanosecond => v1::Value { - value_data: Some(ValueData::DurNanosecondValue(v.value())), + value_data: Some(ValueData::DurationNanosecondValue(v.value())), }, }, Value::List(_) => return None, @@ -853,10 +853,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth, ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime, ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano, - ValueData::DurSecondValue(_) => ColumnDataType::DurationSecond, - ValueData::DurMillisecondValue(_) => ColumnDataType::DurationMillisecond, - ValueData::DurMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, - ValueData::DurNanosecondValue(_) => ColumnDataType::DurationNanosecond, + ValueData::DurationSecondValue(_) => ColumnDataType::DurationSecond, + ValueData::DurationMillisecondValue(_) => ColumnDataType::DurationMillisecond, + ValueData::DurationMicrosecondValue(_) => ColumnDataType::DurationMicrosecond, + ValueData::DurationNanosecondValue(_) => ColumnDataType::DurationNanosecond, }; Some(value_type) } @@ -955,6 +955,12 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { ValueData::IntervalMonthDayNanoValues(convert_i128_to_interval(v.to_i128())) } }), + Value::Duration(v) => Some(match v.unit() { + TimeUnit::Second => ValueData::DurationSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::DurationMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::DurationMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), + }), Value::List(_) => unreachable!(), }, } @@ -974,9 +980,9 @@ mod tests { use std::sync::Arc; use datatypes::types::{ - Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, - TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType, DurationSecondType, DurationMillisecondType, - UInt32Type, + DurationMillisecondType, DurationSecondType, Int32Type, IntervalDayTimeType, + IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType, TimeSecondType, + TimestampMillisecondType, TimestampSecondType, UInt32Type, }; use datatypes::vectors::{ BooleanVector, DurationMicrosecondVector, DurationMillisecondVector, @@ -1060,7 +1066,7 @@ mod tests { assert_eq!(2, values.capacity()); let values = values_with_capacity(ColumnDataType::DurationMillisecond, 2); - let values = values.dur_millisecond_values; + let values = values.duration_millisecond_values; assert_eq!(2, values.capacity()); } @@ -1403,28 +1409,28 @@ mod tests { push_vals(&mut column, 3, vector); assert_eq!( vec![1, 2, 3], - column.values.as_ref().unwrap().dur_nanosecond_values + column.values.as_ref().unwrap().duration_nanosecond_values ); let vector = Arc::new(DurationMicrosecondVector::from_vec(vec![7, 8, 9])); push_vals(&mut column, 3, vector); assert_eq!( vec![7, 8, 9], - column.values.as_ref().unwrap().dur_microsecond_values + column.values.as_ref().unwrap().duration_microsecond_values ); let vector = Arc::new(DurationMillisecondVector::from_vec(vec![4, 5, 6])); push_vals(&mut column, 3, vector); assert_eq!( vec![4, 5, 6], - column.values.as_ref().unwrap().dur_millisecond_values + column.values.as_ref().unwrap().duration_millisecond_values ); let vector = Arc::new(DurationSecondVector::from_vec(vec![10, 11, 12])); push_vals(&mut column, 3, vector); assert_eq!( vec![10, 11, 12], - column.values.as_ref().unwrap().dur_second_values + column.values.as_ref().unwrap().duration_second_values ); } @@ -1534,7 +1540,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Duration(DurationType::Second(DurationSecondType)), Values { - dur_second_values: vec![1_i64, 2_i64, 3_i64], + duration_second_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); @@ -1549,7 +1555,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Duration(DurationType::Millisecond(DurationMillisecondType)), Values { - dur_millisecond_values: vec![1_i64, 2_i64, 3_i64], + duration_millisecond_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 9e8b512ab645..5b173b6fdc67 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -240,7 +240,7 @@ mod tests { .iter() .find(|c| c.name == "duration") .unwrap() - .datatype + .data_type ) .unwrap() ) @@ -321,84 +321,18 @@ mod tests { ); let duration_column = &add_columns.add_columns[4]; - assert!(!duration_column.is_key); assert_eq!( ConcreteDataType::duration_millisecond_datatype(), ConcreteDataType::from( ColumnDataTypeWrapper::try_new( - duration_column.column_def.as_ref().unwrap().datatype + duration_column.column_def.as_ref().unwrap().data_type ) .unwrap() ) ); } - #[test] - fn test_convert_duration_values() { - // second - let actual = convert_values( - &ConcreteDataType::Duration(DurationType::Second(DurationSecondType)), - Values { - dur_second_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Duration(Duration::new_second(1_i64)), - Value::Duration(Duration::new_second(2_i64)), - Value::Duration(Duration::new_second(3_i64)), - ]; - assert_eq!(expect, actual); - - // millisecond - let actual = convert_values( - &ConcreteDataType::Duration(DurationType::Millisecond(DurationMillisecondType)), - Values { - dur_millisecond_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Duration(Duration::new_millisecond(1_i64)), - Value::Duration(Duration::new_millisecond(2_i64)), - Value::Duration(Duration::new_millisecond(3_i64)), - ]; - assert_eq!(expect, actual); - } - - #[test] - fn test_to_table_insert_request() { - let (columns, row_count) = mock_insert_batch(); - let request = GrpcInsertRequest { - table_name: "demo".to_string(), - columns, - row_count, - region_number: 0, - }; - let insert_req = to_table_insert_request("greptime", "public", request).unwrap(); - - assert_eq!("greptime", insert_req.catalog_name); - assert_eq!("public", insert_req.schema_name); - assert_eq!("demo", insert_req.table_name); - - let host = insert_req.columns_values.get("host").unwrap(); - assert_eq!(Value::String("host1".into()), host.get(0)); - assert_eq!(Value::String("host2".into()), host.get(1)); - - let cpu = insert_req.columns_values.get("cpu").unwrap(); - assert_eq!(Value::Float64(0.31.into()), cpu.get(0)); - assert_eq!(Value::Null, cpu.get(1)); - - let memory = insert_req.columns_values.get("memory").unwrap(); - assert_eq!(Value::Null, memory.get(0)); - assert_eq!(Value::Float64(0.1.into()), memory.get(1)); - - let ts = insert_req.columns_values.get("ts").unwrap(); - assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0)); - assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1)); - } - #[test] fn test_is_null() { let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); @@ -487,7 +421,7 @@ mod tests { }; let duration_vals = Values { - dur_millisecond_values: vec![100, 101], + duration_millisecond_values: vec![100, 101], ..Default::default() }; let duration_column = Column { diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 764ae6a3ad5c..85643ea82530 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -217,25 +217,25 @@ pub fn values(arrays: &[VectorRef]) -> Result { ( ConcreteDataType::Duration(DurationType::Second(_)), DurationSecondVector, - dur_second_values, + duration_second_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Millisecond(_)), DurationMillisecondVector, - dur_millisecond_values, + duration_millisecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Microsecond(_)), DurationMicrosecondVector, - dur_microsecond_values, + duration_microsecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Duration(DurationType::Nanosecond(_)), DurationNanosecondVector, - dur_nanosecond_values, + duration_nanosecond_values, |x| { x.into_native() } ) ) @@ -311,7 +311,7 @@ mod tests { let values = values(&[array]).unwrap(); - assert_eq!(vec![1, 2, 3], values.dur_second_values); + assert_eq!(vec![1, 2, 3], values.duration_second_values); } #[test] diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 698bf342103e..21bca1c33acd 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -161,6 +161,22 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { IntervalMonthDayNanoValues, interval_month_day_nano_values ), + (DurationSecond, DurationSecondValue, duration_second_values), + ( + DurationMillisecond, + DurationMillisecondValue, + duration_millisecond_values + ), + ( + DurationMicrosecond, + DurationMicrosecondValue, + duration_microsecond_values + ), + ( + DurationNanosecond, + DurationNanosecondValue, + duration_nanosecond_values + ), ); Ok(()) From caeb0e1534998ea79d4f81b886431ee6d996c18b Mon Sep 17 00:00:00 2001 From: Wei <47681251+QuenKar@users.noreply.github.com> Date: Fri, 15 Sep 2023 19:42:51 +0800 Subject: [PATCH 7/9] Update src/datatypes/src/data_type.rs Co-authored-by: Yingwen --- src/datatypes/src/data_type.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 3906312ad2d2..a37dd2eb7a5f 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -223,7 +223,6 @@ impl ConcreteDataType { /// Try to cast data type as a [`DurationType`]. pub fn as_duration(&self) -> Option { match self { - ConcreteDataType::Int64(_) => Some(DurationType::Millisecond(DurationMillisecondType)), ConcreteDataType::Duration(d) => Some(*d), _ => None, } From c5790235ae5b5a22967917f158da9d6c8c7f1de0 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 15 Sep 2023 19:49:54 +0800 Subject: [PATCH 8/9] chore: cr. --- src/datatypes/src/types/duration_type.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/datatypes/src/types/duration_type.rs b/src/datatypes/src/types/duration_type.rs index daa7bd768d28..8e4516ef41e0 100644 --- a/src/datatypes/src/types/duration_type.rs +++ b/src/datatypes/src/types/duration_type.rs @@ -134,9 +134,6 @@ macro_rules! impl_data_type_for_duration { fn cast_value_ref(value: ValueRef) -> crate::Result> { match value { ValueRef::Null => Ok(None), - ValueRef::Int64(v) =>{ - Ok(Some([]::from(v))) - } ValueRef::Duration(t) => match t.unit() { TimeUnit::$unit => Ok(Some([](t))), other => error::CastTypeSnafu { From 493d32c39a91573beb7dab926b9806346661202f Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 18 Sep 2023 19:19:06 +0800 Subject: [PATCH 9/9] chore: fix greptime-proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d413d9ae49ad..6f8b418c6279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4102,7 +4102,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/Quenkar/greptime-proto.git?rev=9a5dfe9ba412c80827a425d0b38dcd7acd69c561#9a5dfe9ba412c80827a425d0b38dcd7acd69c561" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9#9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index c173753c0e74..8bc4a3f799e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/Quenkar/greptime-proto.git", rev = "9a5dfe9ba412c80827a425d0b38dcd7acd69c561" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4"