diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 6884dff3604d..1f8f6f0add68 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -120,6 +120,39 @@ pub(crate) fn to_proto_value(value: Value) -> Option { Some(proto_value) } +/// Returns the [ColumnDataType] of the value. +/// +/// If value is null, returns `None`. +pub(crate) fn proto_value_type(value: &v1::Value) -> Option { + let value_data = value.value.as_ref()?; + let value_type = match value_data { + v1::value::Value::I8Value(_) => ColumnDataType::Int8, + v1::value::Value::I16Value(_) => ColumnDataType::Int16, + v1::value::Value::I32Value(_) => ColumnDataType::Int32, + v1::value::Value::I64Value(_) => ColumnDataType::Int64, + v1::value::Value::U8Value(_) => ColumnDataType::Uint8, + v1::value::Value::U16Value(_) => ColumnDataType::Uint16, + v1::value::Value::U32Value(_) => ColumnDataType::Uint32, + v1::value::Value::U64Value(_) => ColumnDataType::Uint64, + v1::value::Value::F32Value(_) => ColumnDataType::Float32, + v1::value::Value::F64Value(_) => ColumnDataType::Float64, + v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, + v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, + v1::value::Value::StringValue(_) => ColumnDataType::String, + v1::value::Value::DateValue(_) => ColumnDataType::Date, + v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, + v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, + v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, + v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + }; + Some(value_type) +} + /// Convert [ConcreteDataType] to [ColumnDataType]. pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { let column_data_type = match data_type { @@ -186,3 +219,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType false } } + +// TODO(yingwen): Tests. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4b35a04aadc9..f6bcacd4bb3f 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows}; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -27,8 +27,8 @@ use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; use crate::metadata::{ColumnMetadata, RegionMetadata}; use crate::proto_util::{ - is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type, - to_proto_value, + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, + to_proto_semantic_type, to_proto_value, }; /// Options that affect the entire region. @@ -104,28 +104,51 @@ pub struct WriteRequest { impl WriteRequest { /// Returns a new request. - pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest { - let name_to_index = rows - .schema - .iter() - .enumerate() - .map(|(index, column)| (column.column_name.clone(), index)) - .collect(); - WriteRequest { + pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result { + let mut name_to_index = HashMap::with_capacity(rows.schema.len()); + for (index, column) in rows.schema.iter().enumerate() { + ensure!( + name_to_index + .insert(column.column_name.clone(), index) + .is_none(), + InvalidRequestSnafu { + region_id, + reason: format!("duplicate column {}", column.column_name), + } + ); + } + + Ok(WriteRequest { region_id, op_type, rows, name_to_index, - } + }) } - /// Validate the request. + /// Validates the request. + /// + /// Ensures rows match the schema. pub(crate) fn validate(&self) -> Result<()> { - // - checks whether the request is too large. - // - checks whether each row in rows has the same schema. - // - checks whether each column match the schema in Rows. - // - checks rows don't have duplicate columns. - unimplemented!() + for row in &self.rows.rows { + ensure!( + row.values.len() == self.rows.schema.len(), + InvalidRequestSnafu { + region_id: self.region_id, + reason: format!( + "row has {} columns but schema has {}", + row.values.len(), + self.rows.schema.len() + ), + } + ); + + for (value, column_schema) in row.values.iter().zip(&self.rows.schema) { + validate_proto_value(self.region_id, value, column_schema)?; + } + } + + Ok(()) } /// Get column index by name. @@ -274,6 +297,30 @@ impl WriteRequest { } } +/// Validate proto value schema. +pub(crate) fn validate_proto_value( + region_id: RegionId, + value: &Value, + column_schema: &ColumnSchema, +) -> Result<()> { + if let Some(value_type) = proto_value_type(value) { + ensure!( + value_type as i32 == column_schema.datatype, + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} has type {:?}, but schema has type {:?}", + column_schema.column_name, + value_type, + ColumnDataType::from_i32(column_schema.datatype) + ), + } + ); + } + + Ok(()) +} + /// Sender and write request. pub(crate) struct SenderWriteRequest { /// Result sender.