From d3dba5262278da76e96053e526d2993f3c6e92b5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 8 Oct 2023 20:34:22 +0800 Subject: [PATCH 1/5] test: test different order --- src/mito2/src/engine/basic_test.rs | 129 +++++++++++++++++++++++++++ src/mito2/src/memtable/key_values.rs | 1 + 2 files changed, 130 insertions(+) diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 7caa75ea51ce..405a2569af14 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -16,10 +16,12 @@ use std::collections::HashMap; +use api::v1::value::ValueData; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use datatypes::prelude::ConcreteDataType; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; @@ -176,6 +178,133 @@ async fn test_write_query_region() { assert_eq!(expected, batches.pretty_print().unwrap()); } +#[tokio::test] +async fn test_write_different_order() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); + + // tag_0, tag_1, field_0, field_1, ts, + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Swap position of columns. + column_schemas.swap(0, 3); + column_schemas.swap(2, 4); + + // Now the schema is field_1, tag_1, ts, tag_0, field_0 + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::F64Value((i + 10) as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("b{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------+---------------------+ +| tag_0 | tag_1 | field_0 | field_1 | ts | ++-------+-------+---------+---------+---------------------+ +| a0 | b0 | 0.0 | 10.0 | 1970-01-01T00:00:00 | +| a1 | b1 | 1.0 | 11.0 | 1970-01-01T00:00:01 | +| a2 | b2 | 2.0 | 12.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_write_different_order2() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // tag_0, tag_1, field_0, field_1, ts, + let mut request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); + // Change the field type of field_1. + request.column_metadatas[3].column_schema.data_type = ConcreteDataType::string_datatype(); + + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Swap position of columns. + column_schemas.swap(2, 3); + + // Now the schema is tag_0, tag_1, field_1, field_0, ts + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("b{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue((i + 10).to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------+---------------------+ +| tag_0 | tag_1 | field_0 | field_1 | ts | ++-------+-------+---------+---------+---------------------+ +| a0 | b0 | 0.0 | 10 | 1970-01-01T00:00:00 | +| a1 | b1 | 1.0 | 11 | 1970-01-01T00:00:01 | +| a2 | b2 | 2.0 | 12 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + #[tokio::test] async fn test_put_delete() { let mut env = TestEnv::new(); diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 89c249a89871..2a03e44b695e 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -169,6 +169,7 @@ impl ReadRowHelper { .unwrap(); indices.push(*ts_index); // Iterate columns and find field columns. + // TODO(yingwen): use `field_columns()` for column in metadata.column_metadatas.iter() { if column.semantic_type == SemanticType::Field { // Get index in request for each field column. From 31c8e4e654742b0b08bb0fad9597060d19d5693e Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 Oct 2023 15:43:48 +0800 Subject: [PATCH 2/5] test: add tests for missing and invalid columns --- src/mito2/src/engine/basic_test.rs | 56 ++++++++++++++++++++++++++---- src/mito2/src/request.rs | 35 +++++++++++++++++-- 2 files changed, 82 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 405a2569af14..41ba877d3190 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -22,10 +22,11 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; -use store_api::region_request::RegionOpenRequest; +use store_api::region_request::{RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; use super::*; +use crate::error::Error; use crate::region::version::VersionControlData; use crate::test_util::{ build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema, @@ -179,9 +180,7 @@ async fn test_write_query_region() { } #[tokio::test] -async fn test_write_different_order() { - common_telemetry::init_default_ut_logging(); - +async fn test_different_order() { let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; @@ -242,9 +241,7 @@ async fn test_write_different_order() { } #[tokio::test] -async fn test_write_different_order2() { - common_telemetry::init_default_ut_logging(); - +async fn test_different_order_and_type() { let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; @@ -416,3 +413,48 @@ async fn test_put_overwrite() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_absent_and_invalid_columns() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // tag_0, field_0, field_1, ts, + let request = CreateRequestBuilder::new().field_num(2).build(); + + let mut column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Change the type of field_1 in input. + column_schemas[2].datatype = api::v1::ColumnDataType::String as i32; + // Input tag_0, field_1 (invalid type string), ts + column_schemas.remove(1); + let rows = (0..3) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(format!("a{i}"))), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(i.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), + }, + ], + }) + .collect(); + let rows = Rows { + schema: column_schemas, + rows, + }; + let err = engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f34d8baafe13..f7688b6b1f26 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -683,6 +683,7 @@ pub(crate) struct CompactionFailed { #[cfg(test)] mod tests { + use api::v1::value::ValueData; use api::v1::{Row, SemanticType}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; @@ -950,7 +951,7 @@ mod tests { assert_eq!(expect_rows, request.rows); } - fn region_metadata_for_delete() -> RegionMetadata { + fn region_metadata_two_fields() -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); builder .push_column_metadata(ColumnMetadata { @@ -1010,7 +1011,7 @@ mod tests { values: vec![ts_ms_value(1)], }], }; - let metadata = region_metadata_for_delete(); + let metadata = region_metadata_two_fields(); let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); @@ -1078,4 +1079,34 @@ mod tests { let err = request.fill_missing_columns(&metadata).unwrap_err(); check_invalid_request(&err, "column ts does not have default value"); } + + #[test] + fn test_missing_and_invalid() { + // Missing f0 and f1 has invalid type (string). + let rows = Rows { + schema: vec![ + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("f1", ColumnDataType::String, SemanticType::Field), + ], + rows: vec![Row { + values: vec![ + i64_value(100), + ts_ms_value(1), + Value { + value_data: Some(ValueData::StringValue("xxxxx".to_string())), + }, + ], + }], + }; + let metadata = region_metadata_two_fields(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "xxx"); + } } From 97cffd1d47eb18d58f38ca3096a7341b1b535d04 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 Oct 2023 15:51:57 +0800 Subject: [PATCH 3/5] fix: do not skip schema validation while missing columns --- src/mito2/src/engine/basic_test.rs | 1 - src/mito2/src/request.rs | 11 +++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 41ba877d3190..d16e16bfbaca 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -26,7 +26,6 @@ use store_api::region_request::{RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; use super::*; -use crate::error::Error; use crate::region::version::VersionControlData; use crate::test_util::{ build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f7688b6b1f26..ee4c77e11162 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -145,6 +145,7 @@ impl WriteRequest { .map(|column| (&column.column_name, column)) .collect(); + let mut need_fill_default = false; // Checks all columns in this region. for column in &metadata.column_metadatas { if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { @@ -199,7 +200,7 @@ impl WriteRequest { // Rows don't have this column. self.check_missing_column(column)?; - return FillDefaultSnafu { region_id }.fail(); + need_fill_default = true; } } @@ -213,6 +214,9 @@ impl WriteRequest { .fail(); } + // If we need to fill default values, return a special error. + ensure!(!need_fill_default, FillDefaultSnafu { region_id }); + Ok(()) } @@ -1107,6 +1111,9 @@ mod tests { let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); - check_invalid_request(&err, "xxx"); + check_invalid_request( + &err, + "column f1 expect type Int64(Int64Type), given: STRING(12)", + ); } } From c700b70f3bf6bbe01f3716c98cef3290efb5fbcb Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 Oct 2023 16:06:07 +0800 Subject: [PATCH 4/5] chore: use field_columns() --- src/mito2/src/memtable/key_values.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 2a03e44b695e..33f5bdc75b53 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; +use api::v1::{Mutation, OpType, Row, Rows}; use datatypes::value::ValueRef; use store_api::metadata::RegionMetadata; use store_api::storage::SequenceNumber; @@ -169,13 +169,10 @@ impl ReadRowHelper { .unwrap(); indices.push(*ts_index); // Iterate columns and find field columns. - // TODO(yingwen): use `field_columns()` - for column in metadata.column_metadatas.iter() { - if column.semantic_type == SemanticType::Field { - // Get index in request for each field column. - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); - } + for column in metadata.field_columns() { + // Get index in request for each field column. + let index = name_to_index.get(&column.column_schema.name).unwrap(); + indices.push(*index); } ReadRowHelper { @@ -187,8 +184,7 @@ impl ReadRowHelper { #[cfg(test)] mod tests { - use api::v1; - use api::v1::ColumnDataType; + use api::v1::{self, ColumnDataType, SemanticType}; use super::*; use crate::test_util::i64_value; From a08f24a7fa05f7c6beb5ec99cfdc240e1debddb7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 Oct 2023 16:33:22 +0800 Subject: [PATCH 5/5] test: add tests for different column order --- .../insert/insert_different_order.result | 44 +++++++++++++++++++ .../common/insert/insert_different_order.sql | 15 +++++++ 2 files changed, 59 insertions(+) create mode 100644 tests/cases/standalone/common/insert/insert_different_order.result create mode 100644 tests/cases/standalone/common/insert/insert_different_order.sql diff --git a/tests/cases/standalone/common/insert/insert_different_order.result b/tests/cases/standalone/common/insert/insert_different_order.result new file mode 100644 index 000000000000..10c7cda2fe95 --- /dev/null +++ b/tests/cases/standalone/common/insert/insert_different_order.result @@ -0,0 +1,44 @@ +CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1)); + +Affected Rows: 0 + +INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1); + +Affected Rows: 1 + +INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2); + +Affected Rows: 1 + +INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1'); + +Affected Rows: 1 + +INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1'); + +Affected Rows: 1 + +SELECT * from different_order order by t; + ++----+----+----+----+-------------------------+ +| k0 | k1 | v0 | v1 | t | ++----+----+----+----+-------------------------+ +| a0 | b0 | 1 | 11 | 1970-01-01T00:00:00.001 | +| a1 | | 2 | 12 | 1970-01-01T00:00:00.002 | +| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 | +| a2 | b1 | | | 1970-01-01T00:00:00.004 | ++----+----+----+----+-------------------------+ + +SELECT * from different_order WHERE k0 = 'a2' order by t; + ++----+----+----+----+-------------------------+ +| k0 | k1 | v0 | v1 | t | ++----+----+----+----+-------------------------+ +| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 | +| a2 | b1 | | | 1970-01-01T00:00:00.004 | ++----+----+----+----+-------------------------+ + +DROP TABLE different_order; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/insert_different_order.sql b/tests/cases/standalone/common/insert/insert_different_order.sql new file mode 100644 index 000000000000..725a931036cc --- /dev/null +++ b/tests/cases/standalone/common/insert/insert_different_order.sql @@ -0,0 +1,15 @@ +CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1)); + +INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1); + +INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2); + +INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1'); + +INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1'); + +SELECT * from different_order order by t; + +SELECT * from different_order WHERE k0 = 'a2' order by t; + +DROP TABLE different_order;