From fd812b545129ecd46d1864f7deb33d4d8543980c Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 23:00:08 +0800 Subject: [PATCH] feat: more validation to write request --- src/mito2/src/engine.rs | 2 - src/mito2/src/memtable/key_values.rs | 9 +- src/mito2/src/proto_util.rs | 17 ++ src/mito2/src/request.rs | 333 ++++++++++++++++++++++++++- 4 files changed, 343 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 87a70dd97709..30e6deefd358 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -91,8 +91,6 @@ impl MitoEngine { /// Write to a region. pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> { - write_request.validate()?; - let region = self .inner .workers diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 4ea70c3eae76..dbd8e12b9157 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -190,11 +190,12 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use greptime_proto::v1; - use greptime_proto::v1::{value, ColumnDataType, Value}; + use greptime_proto::v1::ColumnDataType; use store_api::storage::RegionId; use super::*; use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use crate::proto_util::i64_value; const TS_NAME: &str = "ts"; const START_SEQ: SequenceNumber = 100; @@ -290,12 +291,6 @@ mod tests { } } - fn i64_value(data: i64) -> Value { - Value { - value: Some(value::Value::I64Value(data)), - } - } - fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) { assert_eq!(num_rows, kvs.num_rows()); let mut expect_seq = START_SEQ; diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 1f8f6f0add68..d2de210c8330 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -153,6 +153,23 @@ pub(crate) fn proto_value_type(value: &v1::Value) -> Option { Some(value_type) } +// TODO(yingwen): Support conversion in greptime-proto. +/// Creates value for i64. +#[cfg(test)] +pub(crate) fn i64_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::I64Value(data)), + } +} + +/// Creates value for timestamp millis. +#[cfg(test)] +pub(crate) fn ts_ms_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(data)), + } +} + /// Convert [ConcreteDataType] to [ColumnDataType]. pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { let column_data_type = match data_type { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f6bcacd4bb3f..d70232bcc49d 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -100,6 +100,8 @@ pub struct WriteRequest { pub rows: Rows, /// Map column name to column index in `rows`. name_to_index: HashMap, + /// Whether each column has null. + has_null: Vec, } impl WriteRequest { @@ -118,18 +120,23 @@ impl WriteRequest { ); } - Ok(WriteRequest { + let has_null = vec![false; rows.schema.len()]; + let mut request = WriteRequest { region_id, op_type, rows, name_to_index, - }) + has_null, + }; + request.init()?; + + Ok(request) } - /// Validates the request. + /// Initailizes and validates the request. /// /// Ensures rows match the schema. - pub(crate) fn validate(&self) -> Result<()> { + fn init(&mut self) -> Result<()> { for row in &self.rows.rows { ensure!( row.values.len() == self.rows.schema.len(), @@ -143,8 +150,13 @@ impl WriteRequest { } ); - for (value, column_schema) in row.values.iter().zip(&self.rows.schema) { + for (i, (value, column_schema)) in row.values.iter().zip(&self.rows.schema).enumerate() + { validate_proto_value(self.region_id, value, column_schema)?; + + if value.value.is_none() { + self.has_null[i] = true; + } } } @@ -179,10 +191,12 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} expect type {:?}, given: {:?}({})", + "column {} expect type {:?}, given: {}({})", column.column_schema.name, column.column_schema.data_type, - ColumnDataType::from_i32(input_col.datatype), + ColumnDataType::from_i32(input_col.datatype) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.datatype, ) } @@ -194,14 +208,27 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} has semantic type {:?}, given: {:?}({})", + "column {} has semantic type {:?}, given: {}({})", column.column_schema.name, column.semantic_type, - greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.semantic_type ), } ); + + // Check nullable. + // Safety: `rows_columns` ensures this column exists. + let has_null = self.has_null[self.name_to_index[&column.column_schema.name]]; + ensure!( + !has_null || column.column_schema.is_nullable(), + InvalidRequestSnafu { + region_id, + reason: format!("column {} is not null", column.column_schema.name), + } + ); } else { // For columns not in rows, checks whether they have default value. ensure!( @@ -405,3 +432,291 @@ impl RequestBody { } } } + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use greptime_proto::v1::{Row, SemanticType}; + + use super::*; + use crate::error::Error; + use crate::metadata::RegionMetadataBuilder; + use crate::proto_util::{i64_value, ts_ms_value}; + + fn new_column_schema( + name: &str, + data_type: ColumnDataType, + semantic_type: SemanticType, + ) -> ColumnSchema { + ColumnSchema { + column_name: name.to_string(), + datatype: data_type as i32, + semantic_type: semantic_type as i32, + } + } + + fn check_invalid_request(err: &Error, expect: &str) { + if let Error::InvalidRequest { + region_id: _, + reason, + location: _, + } = err + { + assert_eq!(reason, expect); + } else { + panic!("Unexpected error {err}") + } + } + + #[test] + fn test_write_request_duplicate_column() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "duplicate column c0"); + } + + #[test] + fn test_valid_write_request() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + assert_eq!(0, request.column_index_by_name("c0").unwrap()); + assert_eq!(1, request.column_index_by_name("c1").unwrap()); + assert_eq!(None, request.column_index_by_name("c2")); + } + + #[test] + fn test_write_request_column_num() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2), i64_value(3)], + }], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "row has 3 columns but schema has 2"); + } + + fn new_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1), 1); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: crate::metadata::SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: crate::metadata::SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() + } + + #[test] + fn test_check_schema() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + request.check_schema(&metadata).unwrap(); + } + + #[test] + fn test_column_type() { + let rows = Rows { + schema: vec![ + new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)"); + } + + #[test] + fn test_semantic_type() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Tag, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)"); + } + + #[test] + fn test_column_nullable() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![Value { value: None }, i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts is not null"); + } + + #[test] + fn test_column_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "missing column ts"); + } + + #[test] + fn test_unknown_column() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2), i64_value(3)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, r#"unknown columns: ["k1"]"#); + } + + #[test] + fn test_fill_missing_columns() { + let rows = Rows { + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], + rows: vec![Row { + values: vec![ts_ms_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + request.fill_missing_columns(&metadata).unwrap(); + + let expect_rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), Value { value: None }], + }], + }; + assert_eq!(expect_rows, request.rows); + } + + #[test] + fn test_no_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.fill_missing_columns(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts does not have default value"); + } +}