Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema validation is skipped once we need to fill a column #2548

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 171 additions & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_request::RegionOpenRequest;
use datatypes::prelude::ConcreteDataType;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;

use super::*;
Expand Down Expand Up @@ -176,6 +178,129 @@ async fn test_write_query_region() {
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_different_order() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();

// tag_0, tag_1, field_0, field_1, ts,
let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Swap position of columns.
column_schemas.swap(0, 3);
column_schemas.swap(2, 4);

// Now the schema is field_1, tag_1, ts, tag_0, field_0
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::F64Value((i + 10) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("b{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
| tag_0 | tag_1 | field_0 | field_1 | ts |
+-------+-------+---------+---------+---------------------+
| a0 | b0 | 0.0 | 10.0 | 1970-01-01T00:00:00 |
| a1 | b1 | 1.0 | 11.0 | 1970-01-01T00:00:01 |
| a2 | b2 | 2.0 | 12.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_different_order_and_type() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// tag_0, tag_1, field_0, field_1, ts,
let mut request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();
// Change the field type of field_1.
request.column_metadatas[3].column_schema.data_type = ConcreteDataType::string_datatype();

let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Swap position of columns.
column_schemas.swap(2, 3);

// Now the schema is tag_0, tag_1, field_1, field_0, ts
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("b{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue((i + 10).to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
| tag_0 | tag_1 | field_0 | field_1 | ts |
+-------+-------+---------+---------+---------------------+
| a0 | b0 | 0.0 | 10 | 1970-01-01T00:00:00 |
| a1 | b1 | 1.0 | 11 | 1970-01-01T00:00:01 |
| a2 | b2 | 2.0 | 12 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_put_delete() {
let mut env = TestEnv::new();
Expand Down Expand Up @@ -287,3 +412,48 @@ async fn test_put_overwrite() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_absent_and_invalid_columns() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// tag_0, field_0, field_1, ts,
let request = CreateRequestBuilder::new().field_num(2).build();

let mut column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Change the type of field_1 in input.
column_schemas[2].datatype = api::v1::ColumnDataType::String as i32;
// Input tag_0, field_1 (invalid type string), ts
column_schemas.remove(1);
let rows = (0..3)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(format!("a{i}"))),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(i.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
},
],
})
.collect();
let rows = Rows {
schema: column_schemas,
rows,
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
15 changes: 6 additions & 9 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
use api::v1::{Mutation, OpType, Row, Rows};
use datatypes::value::ValueRef;
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
Expand Down Expand Up @@ -169,12 +169,10 @@ impl ReadRowHelper {
.unwrap();
indices.push(*ts_index);
// Iterate columns and find field columns.
for column in metadata.column_metadatas.iter() {
if column.semantic_type == SemanticType::Field {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
}
for column in metadata.field_columns() {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
}

ReadRowHelper {
Expand All @@ -186,8 +184,7 @@ impl ReadRowHelper {

#[cfg(test)]
mod tests {
use api::v1;
use api::v1::ColumnDataType;
use api::v1::{self, ColumnDataType, SemanticType};

use super::*;
use crate::test_util::i64_value;
Expand Down
44 changes: 41 additions & 3 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl WriteRequest {
.map(|column| (&column.column_name, column))
.collect();

let mut need_fill_default = false;
// Checks all columns in this region.
for column in &metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl WriteRequest {
// Rows don't have this column.
self.check_missing_column(column)?;

return FillDefaultSnafu { region_id }.fail();
need_fill_default = true;
}
}

Expand All @@ -213,6 +214,9 @@ impl WriteRequest {
.fail();
}

// If we need to fill default values, return a special error.
ensure!(!need_fill_default, FillDefaultSnafu { region_id });

Ok(())
}

Expand Down Expand Up @@ -683,6 +687,7 @@ pub(crate) struct CompactionFailed {

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{Row, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
Expand Down Expand Up @@ -950,7 +955,7 @@ mod tests {
assert_eq!(expect_rows, request.rows);
}

fn region_metadata_for_delete() -> RegionMetadata {
fn region_metadata_two_fields() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
Expand Down Expand Up @@ -1010,7 +1015,7 @@ mod tests {
values: vec![ts_ms_value(1)],
}],
};
let metadata = region_metadata_for_delete();
let metadata = region_metadata_two_fields();

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
Expand Down Expand Up @@ -1078,4 +1083,37 @@ mod tests {
let err = request.fill_missing_columns(&metadata).unwrap_err();
check_invalid_request(&err, "column ts does not have default value");
}

#[test]
fn test_missing_and_invalid() {
// Missing f0 and f1 has invalid type (string).
let rows = Rows {
schema: vec![
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
],
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value {
value_data: Some(ValueData::StringValue("xxxxx".to_string())),
},
],
}],
};
let metadata = region_metadata_two_fields();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(
&err,
"column f1 expect type Int64(Int64Type), given: STRING(12)",
);
}
}
44 changes: 44 additions & 0 deletions tests/cases/standalone/common/insert/insert_different_order.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1));

Affected Rows: 0

INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1);

Affected Rows: 1

INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2);

Affected Rows: 1

INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1');

Affected Rows: 1

INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1');

Affected Rows: 1

SELECT * from different_order order by t;

+----+----+----+----+-------------------------+
| k0 | k1 | v0 | v1 | t |
+----+----+----+----+-------------------------+
| a0 | b0 | 1 | 11 | 1970-01-01T00:00:00.001 |
| a1 | | 2 | 12 | 1970-01-01T00:00:00.002 |
| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 |
| a2 | b1 | | | 1970-01-01T00:00:00.004 |
+----+----+----+----+-------------------------+

SELECT * from different_order WHERE k0 = 'a2' order by t;

+----+----+----+----+-------------------------+
| k0 | k1 | v0 | v1 | t |
+----+----+----+----+-------------------------+
| a2 | b1 | | 13 | 1970-01-01T00:00:00.003 |
| a2 | b1 | | | 1970-01-01T00:00:00.004 |
+----+----+----+----+-------------------------+

DROP TABLE different_order;

Affected Rows: 0

15 changes: 15 additions & 0 deletions tests/cases/standalone/common/insert/insert_different_order.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE different_order(k0 STRING, k1 STRING, v0 INTEGER, v1 INTEGER, t TIMESTAMP, time index(t), primary key(k0, k1));

INSERT INTO different_order (v1, k1, k0, t, v0) VALUES (11, 'b0', 'a0', 1, 1);

INSERT INTO different_order (v1, v0, k0, t) VALUES (12, 2, 'a1', 2);

INSERT INTO different_order (t, v1, k0, k1) VALUES (3, 13, 'a2', 'b1');

INSERT INTO different_order (t, k0, k1) VALUES (4, 'a2', 'b1');

SELECT * from different_order order by t;

SELECT * from different_order WHERE k0 = 'a2' order by t;

DROP TABLE different_order;
Loading