Skip to content

Commit

Permalink
fix: schema validation is skipped once we need to fill a column (#2548)
Browse files Browse the repository at this point in the history
* test: test different order

* test: add tests for missing and invalid columns

* fix: do not skip schema validation while missing columns

* chore: use field_columns()

* test: add tests for different column order
  • Loading branch information
evenyag authored Oct 9, 2023
1 parent 9b3470b commit d68dd1f
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 13 deletions.
172 changes: 171 additions & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_request::RegionOpenRequest;
use datatypes::prelude::ConcreteDataType;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;

use super::*;
Expand Down Expand Up @@ -176,6 +178,129 @@ async fn test_write_query_region() {
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_different_order() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();

// tag_0, tag_1, field_0, field_1, ts,
let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Swap position of columns.
column_schemas.swap(0, 3);
column_schemas.swap(2, 4);

// Now the schema is field_1, tag_1, ts, tag_0, field_0
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::F64Value((i + 10) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("b{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
| tag_0 | tag_1 | field_0 | field_1 | ts |
+-------+-------+---------+---------+---------------------+
| a0 | b0 | 0.0 | 10.0 | 1970-01-01T00:00:00 |
| a1 | b1 | 1.0 | 11.0 | 1970-01-01T00:00:01 |
| a2 | b2 | 2.0 | 12.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_different_order_and_type() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// tag_0, tag_1, field_0, field_1, ts,
let mut request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();
// Change the field type of field_1.
request.column_metadatas[3].column_schema.data_type = ConcreteDataType::string_datatype();

let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Swap position of columns.
column_schemas.swap(2, 3);

// Now the schema is tag_0, tag_1, field_1, field_0, ts
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("b{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue((i + 10).to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
| tag_0 | tag_1 | field_0 | field_1 | ts |
+-------+-------+---------+---------+---------------------+
| a0 | b0 | 0.0 | 10 | 1970-01-01T00:00:00 |
| a1 | b1 | 1.0 | 11 | 1970-01-01T00:00:01 |
| a2 | b2 | 2.0 | 12 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_put_delete() {
let mut env = TestEnv::new();
Expand Down Expand Up @@ -287,3 +412,48 @@ async fn test_put_overwrite() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_absent_and_invalid_columns() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// tag_0, field_0, field_1, ts,
let request = CreateRequestBuilder::new().field_num(2).build();

let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Change the type of field_1 in input.
column_schemas[2].datatype = api::v1::ColumnDataType::String as i32;
// Input tag_0, field_1 (invalid type string), ts
column_schemas.remove(1);
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(i.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
15 changes: 6 additions & 9 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
use api::v1::{Mutation, OpType, Row, Rows};
use datatypes::value::ValueRef;
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
Expand Down Expand Up @@ -169,12 +169,10 @@ impl ReadRowHelper {
.unwrap();
indices.push(*ts_index);
// Iterate columns and find field columns.
for column in metadata.column_metadatas.iter() {
if column.semantic_type == SemanticType::Field {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
}
for column in metadata.field_columns() {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
}

ReadRowHelper {
Expand All @@ -186,8 +184,7 @@ impl ReadRowHelper {

#[cfg(test)]
mod tests {
use api::v1;
use api::v1::ColumnDataType;
use api::v1::{self, ColumnDataType, SemanticType};

use super::*;
use crate::test_util::i64_value;
Expand Down
44 changes: 41 additions & 3 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl WriteRequest {
.map(|column| (&column.column_name, column))
.collect();

let mut need_fill_default = false;
// Checks all columns in this region.
for column in &metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl WriteRequest {
// Rows don't have this column.
self.check_missing_column(column)?;

return FillDefaultSnafu { region_id }.fail();
need_fill_default = true;
}
}

Expand All @@ -213,6 +214,9 @@ impl WriteRequest {
.fail();
}

// If we need to fill default values, return a special error.
ensure!(!need_fill_default, FillDefaultSnafu { region_id });

Ok(())
}

Expand Down Expand Up @@ -683,6 +687,7 @@ pub(crate) struct CompactionFailed {

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{Row, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
Expand Down Expand Up @@ -950,7 +955,7 @@ mod tests {
assert_eq!(expect_rows, request.rows);
}

fn region_metadata_for_delete() -> RegionMetadata {
fn region_metadata_two_fields() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
Expand Down Expand Up @@ -1010,7 +1015,7 @@ mod tests {
values: vec![ts_ms_value(1)],
}],
};
let metadata = region_metadata_for_delete();
let metadata = region_metadata_two_fields();

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
Expand Down Expand Up @@ -1078,4 +1083,37 @@ mod tests {
let err = request.fill_missing_columns(&metadata).unwrap_err();
check_invalid_request(&err, "column ts does not have default value");
}

#[test]
fn test_missing_and_invalid() {
// Missing f0 and f1 has invalid type (string).
let rows = Rows {
schema: vec![
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
],
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value {
value_data: Some(ValueData::StringValue("xxxxx".to_string())),
},
],
}],
};
let metadata = region_metadata_two_fields();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(
&err,
"column f1 expect type Int64(Int64Type), given: STRING(12)",
);
}
}
44 changes: 44 additions & 0 deletions tests/cases/standalone/common/insert/insert_different_order.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1));

Affected Rows: 0

INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1);

Affected Rows: 1

INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2);

Affected Rows: 1

INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1');

Affected Rows: 1

INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1');

Affected Rows: 1

SELECT * from different_order order by t;

+----+----+----+----+-------------------------+
| k0 | k1 | v0 | v1 | t |
+----+----+----+----+-------------------------+
| a0 | b0 | 1 | 11 | 1970-01-01T00:00:00.001 |
| a1 | | 2 | 12 | 1970-01-01T00:00:00.002 |
| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 |
| a2 | b1 | | | 1970-01-01T00:00:00.004 |
+----+----+----+----+-------------------------+

SELECT * from different_order WHERE k0 = 'a2' order by t;

+----+----+----+----+-------------------------+
| k0 | k1 | v0 | v1 | t |
+----+----+----+----+-------------------------+
| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 |
| a2 | b1 | | | 1970-01-01T00:00:00.004 |
+----+----+----+----+-------------------------+

DROP TABLE different_order;

Affected Rows: 0

15 changes: 15 additions & 0 deletions tests/cases/standalone/common/insert/insert_different_order.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1));

INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1);

INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2);

INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1');

INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1');

SELECT * from different_order order by t;

SELECT * from different_order WHERE k0 = 'a2' order by t;

DROP TABLE different_order;

0 comments on commit d68dd1f

Please sign in to comment.