Skip to content

Commit

Permalink
feat: validate write request
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 11, 2023
1 parent 7050380 commit 962bd85
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 18 deletions.
35 changes: 35 additions & 0 deletions src/mito2/src/proto_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,39 @@ pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
Some(proto_value)
}

/// Returns the [ColumnDataType] of the value.
///
/// If value is null, returns `None`.
pub(crate) fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
let value_data = value.value.as_ref()?;
let value_type = match value_data {
v1::value::Value::I8Value(_) => ColumnDataType::Int8,
v1::value::Value::I16Value(_) => ColumnDataType::Int16,
v1::value::Value::I32Value(_) => ColumnDataType::Int32,
v1::value::Value::I64Value(_) => ColumnDataType::Int64,
v1::value::Value::U8Value(_) => ColumnDataType::Uint8,
v1::value::Value::U16Value(_) => ColumnDataType::Uint16,
v1::value::Value::U32Value(_) => ColumnDataType::Uint32,
v1::value::Value::U64Value(_) => ColumnDataType::Uint64,
v1::value::Value::F32Value(_) => ColumnDataType::Float32,
v1::value::Value::F64Value(_) => ColumnDataType::Float64,
v1::value::Value::BoolValue(_) => ColumnDataType::Boolean,
v1::value::Value::BinaryValue(_) => ColumnDataType::Binary,
v1::value::Value::StringValue(_) => ColumnDataType::String,
v1::value::Value::DateValue(_) => ColumnDataType::Date,
v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime,
v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond,
v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond,
v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
};
Some(value_type)
}

/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
Expand Down Expand Up @@ -186,3 +219,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType
false
}
}

// TODO(yingwen): Tests.
83 changes: 65 additions & 18 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows};
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
Expand All @@ -27,8 +27,8 @@ use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::proto_util::{
is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type,
to_proto_value,
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
to_proto_semantic_type, to_proto_value,
};

/// Options that affect the entire region.
Expand Down Expand Up @@ -104,28 +104,51 @@ pub struct WriteRequest {

impl WriteRequest {
/// Returns a new request.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest {
let name_to_index = rows
.schema
.iter()
.enumerate()
.map(|(index, column)| (column.column_name.clone(), index))
.collect();
WriteRequest {
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result<WriteRequest> {
let mut name_to_index = HashMap::with_capacity(rows.schema.len());
for (index, column) in rows.schema.iter().enumerate() {
ensure!(
name_to_index
.insert(column.column_name.clone(), index)
.is_none(),
InvalidRequestSnafu {
region_id,
reason: format!("duplicate column {}", column.column_name),
}
);
}

Ok(WriteRequest {
region_id,
op_type,
rows,
name_to_index,
}
})
}

/// Validate the request.
/// Validates the request.
///
/// Ensures rows match the schema.
pub(crate) fn validate(&self) -> Result<()> {
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks whether each column match the schema in Rows.
// - checks rows don't have duplicate columns.
unimplemented!()
for row in &self.rows.rows {
ensure!(
row.values.len() == self.rows.schema.len(),
InvalidRequestSnafu {
region_id: self.region_id,
reason: format!(
"row has {} columns but schema has {}",
row.values.len(),
self.rows.schema.len()
),
}
);

for (value, column_schema) in row.values.iter().zip(&self.rows.schema) {
validate_proto_value(self.region_id, value, column_schema)?;
}
}

Ok(())
}

/// Get column index by name.
Expand Down Expand Up @@ -274,6 +297,30 @@ impl WriteRequest {
}
}

/// Validate proto value schema.
pub(crate) fn validate_proto_value(
region_id: RegionId,
value: &Value,
column_schema: &ColumnSchema,
) -> Result<()> {
if let Some(value_type) = proto_value_type(value) {
ensure!(
value_type as i32 == column_schema.datatype,
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} has type {:?}, but schema has type {:?}",
column_schema.column_name,
value_type,
ColumnDataType::from_i32(column_schema.datatype)
),
}
);
}

Ok(())
}

/// Sender and write request.
pub(crate) struct SenderWriteRequest {
/// Result sender.
Expand Down

0 comments on commit 962bd85

Please sign in to comment.