From d4707891773a79c48ea53ddcd094272a40b0fd7a Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 9 Oct 2023 17:20:51 +0800 Subject: [PATCH] fix: schema validation is skipped once we need to fill a column (#2548) * test: test different order * test: add tests for missing and invalid columns * fix: do not skip schema validation while missing columns * chore: use field_columns() * test: add tests for different column order --- src/mito2/src/engine/basic_test.rs | 172 +++++++++++++++++- src/mito2/src/memtable/key_values.rs | 15 +- src/mito2/src/request.rs | 44 ++++- .../insert/insert_different_order.result | 44 +++++ .../common/insert/insert_different_order.sql | 15 ++ 5 files changed, 277 insertions(+), 13 deletions(-) create mode 100644 tests/cases/standalone/common/insert/insert_different_order.result create mode 100644 tests/cases/standalone/common/insert/insert_different_order.sql diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 7caa75ea51ce..d16e16bfbaca 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -16,11 +16,13 @@ use std::collections::HashMap; +use api::v1::value::ValueData; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; -use store_api::region_request::RegionOpenRequest; +use datatypes::prelude::ConcreteDataType; +use store_api::region_request::{RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; use super::*; @@ -176,6 +178,129 @@ async fn test_write_query_region() { assert_eq!(expected, batches.pretty_print().unwrap()); } +#[tokio::test] +async fn test_different_order() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); + + // tag_0, tag_1, field_0, field_1, ts, + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Swap position of columns. + column_schemas.swap(0, 3); + column_schemas.swap(2, 4); + + // Now the schema is field_1, tag_1, ts, tag_0, field_0 + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::F64Value((i + 10) as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("b{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------+---------------------+ +| tag_0 | tag_1 | field_0 | field_1 | ts | ++-------+-------+---------+---------+---------------------+ +| a0 | b0 | 0.0 | 10.0 | 1970-01-01T00:00:00 | +| a1 | b1 | 1.0 | 11.0 | 1970-01-01T00:00:01 | +| a2 | b2 | 2.0 | 12.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_different_order_and_type() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // tag_0, tag_1, field_0, field_1, ts, + let mut request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); + // Change the field type of field_1. + request.column_metadatas[3].column_schema.data_type = ConcreteDataType::string_datatype(); + + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Swap position of columns. + column_schemas.swap(2, 3); + + // Now the schema is tag_0, tag_1, field_1, field_0, ts + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("b{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue((i + 10).to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------+---------------------+ +| tag_0 | tag_1 | field_0 | field_1 | ts | ++-------+-------+---------+---------+---------------------+ +| a0 | b0 | 0.0 | 10 | 1970-01-01T00:00:00 | +| a1 | b1 | 1.0 | 11 | 1970-01-01T00:00:01 | +| a2 | b2 | 2.0 | 12 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + #[tokio::test] async fn test_put_delete() { let mut env = TestEnv::new(); @@ -287,3 +412,48 @@ async fn test_put_overwrite() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_absent_and_invalid_columns() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // tag_0, field_0, field_1, ts, + let request = CreateRequestBuilder::new().field_num(2).build(); + + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Change the type of field_1 in input. + column_schemas[2].datatype = api::v1::ColumnDataType::String as i32; + // Input tag_0, field_1 (invalid type string), ts + column_schemas.remove(1); + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(i.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + let err = engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); +} diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 89c249a89871..33f5bdc75b53 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; +use api::v1::{Mutation, OpType, Row, Rows}; use datatypes::value::ValueRef; use store_api::metadata::RegionMetadata; use store_api::storage::SequenceNumber; @@ -169,12 +169,10 @@ impl ReadRowHelper { .unwrap(); indices.push(*ts_index); // Iterate columns and find field columns. - for column in metadata.column_metadatas.iter() { - if column.semantic_type == SemanticType::Field { - // Get index in request for each field column. - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); - } + for column in metadata.field_columns() { + // Get index in request for each field column. + let index = name_to_index.get(&column.column_schema.name).unwrap(); + indices.push(*index); } ReadRowHelper { @@ -186,8 +184,7 @@ impl ReadRowHelper { #[cfg(test)] mod tests { - use api::v1; - use api::v1::ColumnDataType; + use api::v1::{self, ColumnDataType, SemanticType}; use super::*; use crate::test_util::i64_value; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f34d8baafe13..ee4c77e11162 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -145,6 +145,7 @@ impl WriteRequest { .map(|column| (&column.column_name, column)) .collect(); + let mut need_fill_default = false; // Checks all columns in this region. for column in &metadata.column_metadatas { if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { @@ -199,7 +200,7 @@ impl WriteRequest { // Rows don't have this column. self.check_missing_column(column)?; - return FillDefaultSnafu { region_id }.fail(); + need_fill_default = true; } } @@ -213,6 +214,9 @@ impl WriteRequest { .fail(); } + // If we need to fill default values, return a special error. + ensure!(!need_fill_default, FillDefaultSnafu { region_id }); + Ok(()) } @@ -683,6 +687,7 @@ pub(crate) struct CompactionFailed { #[cfg(test)] mod tests { + use api::v1::value::ValueData; use api::v1::{Row, SemanticType}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; @@ -950,7 +955,7 @@ mod tests { assert_eq!(expect_rows, request.rows); } - fn region_metadata_for_delete() -> RegionMetadata { + fn region_metadata_two_fields() -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); builder .push_column_metadata(ColumnMetadata { @@ -1010,7 +1015,7 @@ mod tests { values: vec![ts_ms_value(1)], }], }; - let metadata = region_metadata_for_delete(); + let metadata = region_metadata_two_fields(); let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); @@ -1078,4 +1083,37 @@ mod tests { let err = request.fill_missing_columns(&metadata).unwrap_err(); check_invalid_request(&err, "column ts does not have default value"); } + + #[test] + fn test_missing_and_invalid() { + // Missing f0 and f1 has invalid type (string). + let rows = Rows { + schema: vec![ + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("f1", ColumnDataType::String, SemanticType::Field), + ], + rows: vec![Row { + values: vec![ + i64_value(100), + ts_ms_value(1), + Value { + value_data: Some(ValueData::StringValue("xxxxx".to_string())), + }, + ], + }], + }; + let metadata = region_metadata_two_fields(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request( + &err, + "column f1 expect type Int64(Int64Type), given: STRING(12)", + ); + } } diff --git a/tests/cases/standalone/common/insert/insert_different_order.result b/tests/cases/standalone/common/insert/insert_different_order.result new file mode 100644 index 000000000000..10c7cda2fe95 --- /dev/null +++ b/tests/cases/standalone/common/insert/insert_different_order.result @@ -0,0 +1,44 @@ +CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1)); + +Affected Rows: 0 + +INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1); + +Affected Rows: 1 + +INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2); + +Affected Rows: 1 + +INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1'); + +Affected Rows: 1 + +INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1'); + +Affected Rows: 1 + +SELECT * from different_order order by t; + ++----+----+----+----+-------------------------+ +| k0 | k1 | v0 | v1 | t | ++----+----+----+----+-------------------------+ +| a0 | b0 | 1 | 11 | 1970-01-01T00:00:00.001 | +| a1 | | 2 | 12 | 1970-01-01T00:00:00.002 | +| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 | +| a2 | b1 | | | 1970-01-01T00:00:00.004 | ++----+----+----+----+-------------------------+ + +SELECT * from different_order WHERE k0 = 'a2' order by t; + ++----+----+----+----+-------------------------+ +| k0 | k1 | v0 | v1 | t | ++----+----+----+----+-------------------------+ +| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 | +| a2 | b1 | | | 1970-01-01T00:00:00.004 | ++----+----+----+----+-------------------------+ + +DROP TABLE different_order; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/insert_different_order.sql b/tests/cases/standalone/common/insert/insert_different_order.sql new file mode 100644 index 000000000000..725a931036cc --- /dev/null +++ b/tests/cases/standalone/common/insert/insert_different_order.sql @@ -0,0 +1,15 @@ +CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1)); + +INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1); + +INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2); + +INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1'); + +INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1'); + +SELECT * from different_order order by t; + +SELECT * from different_order WHERE k0 = 'a2' order by t; + +DROP TABLE different_order;