diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index d5da4f43ae13..3211cc88a1c2 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -18,10 +18,13 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; -use api::v1::{Row, Rows}; +use api::v1::{ColumnSchema, Row, Rows, SemanticType}; use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; -use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest}; +use store_api::region_request::{ + RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest, + RegionPutRequest, +}; use store_api::storage::RegionId; use super::*; @@ -255,6 +258,26 @@ fn build_rows(start: usize, end: usize) -> Vec { .collect() } +fn rows_schema(request: &RegionCreateRequest) -> Vec { + request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>() +} + +async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { + let num_rows = rows.rows.len(); + let output = engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap(); + let Output::AffectedRows(rows_inserted) = output else { + unreachable!() + }; + assert_eq!(num_rows, rows_inserted); +} + #[tokio::test] async fn test_write_to_region() { let mut env = TestEnv::with_prefix("write-to-region"); @@ -263,29 +286,17 @@ async fn test_write_to_region() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); - let column_schemas = request - .column_metadatas - .iter() - .map(column_metadata_to_column_schema) - .collect::>(); + let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); - let num_rows = 42; let rows = Rows { schema: column_schemas, - rows: build_rows(0, num_rows), + rows: build_rows(0, 42), }; - let output = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) - .await - .unwrap(); - let Output::AffectedRows(rows_inserted) = output else { - unreachable!() - }; - assert_eq!(num_rows, rows_inserted); + put_rows(&engine, region_id, rows).await; } #[tokio::test] @@ -298,11 +309,7 @@ async fn test_region_replay() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); - let column_schemas = request - .column_metadatas - .iter() - .map(column_metadata_to_column_schema) - .collect::>(); + let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) .await @@ -312,27 +319,13 @@ async fn test_region_replay() { schema: column_schemas.clone(), rows: build_rows(0, 20), }; - let output = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) - .await - .unwrap(); - let Output::AffectedRows(rows_inserted) = output else { - unreachable!() - }; - assert_eq!(20, rows_inserted); + put_rows(&engine, region_id, rows).await; let rows = Rows { schema: column_schemas, rows: build_rows(20, 42), }; - let output = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) - .await - .unwrap(); - let Output::AffectedRows(rows_inserted) = output else { - unreachable!() - }; - assert_eq!(22, rows_inserted); + put_rows(&engine, region_id, rows).await; engine.stop().await.unwrap(); @@ -387,25 +380,189 @@ async fn test_write_query_region() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); - let column_schemas = request + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let scanner = engine.handle_query(region_id, request).unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +/// Build rows to put for specific `key`. +fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec { + (start..end) + .enumerate() + .map(|(idx, ts)| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value((value_start + idx) as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }, + ], + }) + .collect() +} + +/// Build rows to delete for specific `key`. +fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec { + (start..end) + .map(|ts| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }, + ], + }) + .collect() +} + +fn delete_rows_schema(request: &RegionCreateRequest) -> Vec { + request .column_metadatas .iter() + .filter(|col| col.semantic_type != SemanticType::Field) .map(column_metadata_to_column_schema) - .collect::>(); + .collect::>() +} + +async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { + let num_rows = rows.rows.len(); + let output = engine + .handle_request( + region_id, + RegionRequest::Delete(RegionDeleteRequest { rows }), + ) + .await + .unwrap(); + let Output::AffectedRows(rows_inserted) = output else { + unreachable!() + }; + assert_eq!(num_rows, rows_inserted); +} + +#[tokio::test] +async fn test_put_delete() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + let delete_schema = delete_rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); let rows = Rows { - schema: column_schemas, - rows: build_rows(0, 3), + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + // Delete (a, 2) + let rows = Rows { + schema: delete_schema.clone(), + rows: build_delete_rows_for_key("a", 2, 3), + }; + delete_rows(&engine, region_id, rows).await; + // Delete (b, 0), (b, 1) + let rows = Rows { + schema: delete_schema, + rows: build_delete_rows_for_key("b", 0, 2), }; + delete_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let scanner = engine.handle_query(region_id, request).unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| b | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_put_overwrite() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + // Put (a, 0) => 5.0, (a, 1) => 6.0 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 5), + }; + put_rows(&engine, region_id, rows).await; + // Put (b, 0) => 3.0 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 1, 3), + }; + put_rows(&engine, region_id, rows).await; + // Put (b, 2) => 4.0 + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("b", 2, 3, 4), + }; + put_rows(&engine, region_id, rows).await; + let request = ScanRequest::default(); let scanner = engine.handle_query(region_id, request).unwrap(); let stream = scanner.scan().await.unwrap(); @@ -414,9 +571,12 @@ async fn test_write_query_region() { +-------+---------+---------------------+ | tag_0 | field_0 | ts | +-------+---------+---------------------+ -| 0 | 0.0 | 1970-01-01T00:00:00 | -| 1 | 1.0 | 1970-01-01T00:00:01 | -| 2 | 2.0 | 1970-01-01T00:00:02 | +| a | 5.0 | 1970-01-01T00:00:00 | +| a | 6.0 | 1970-01-01T00:00:01 | +| a | 2.0 | 1970-01-01T00:00:02 | +| b | 3.0 | 1970-01-01T00:00:00 | +| b | 1.0 | 1970-01-01T00:00:01 | +| b | 4.0 | 1970-01-01T00:00:02 | +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 7b139ff4d844..77716153eacc 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -21,9 +21,10 @@ use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, to_proto_value, }; -use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value}; +use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_base::readable_size::ReadableSize; use common_query::Output; +use datatypes::prelude::DataType; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ @@ -124,17 +125,18 @@ impl WriteRequest { }) } - /// Get column index by name. + /// Gets column index by name. pub(crate) fn column_index_by_name(&self, name: &str) -> Option { self.name_to_index.get(name).copied() } - // TODO(yingwen): Check delete schema. /// Checks schema of rows is compatible with schema of the region. /// /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) /// error. pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> { + debug_assert_eq!(self.region_id, metadata.region_id); + let region_id = self.region_id; // Index all columns in rows. let mut rows_columns: HashMap<_, _> = self @@ -192,15 +194,8 @@ impl WriteRequest { } ); } else { - // For columns not in rows, checks whether they have default value. - ensure!( - column.column_schema.is_nullable() - || column.column_schema.default_constraint().is_some(), - InvalidRequestSnafu { - region_id, - reason: format!("missing column {}", column.column_schema.name), - } - ); + // Rows don't have this column. + self.check_missing_column(column)?; return FillDefaultSnafu { region_id }.fail(); } @@ -219,48 +214,26 @@ impl WriteRequest { Ok(()) } - /// Try to fill missing columns. + /// Tries to fill missing columns. /// /// Currently, our protobuf format might be inefficient when we need to fill lots of null /// values. pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> { + debug_assert_eq!(self.region_id, metadata.region_id); + for column in &metadata.column_metadatas { if !self.name_to_index.contains_key(&column.column_schema.name) { - self.fill_column(metadata.region_id, column)?; + self.fill_column(column)?; } } Ok(()) } - /// Fill default value for specific `column`. - fn fill_column(&mut self, region_id: RegionId, column: &ColumnMetadata) -> Result<()> { + /// Fills default value for specific `column`. + fn fill_column(&mut self, column: &ColumnMetadata) -> Result<()> { // Need to add a default value for this column. - let default_value = column - .column_schema - .create_default() - .context(CreateDefaultSnafu { - region_id, - column: &column.column_schema.name, - })? - // This column doesn't have default value. - .with_context(|| InvalidRequestSnafu { - region_id, - reason: format!( - "column {} does not have default value", - column.column_schema.name - ), - })?; - - // Convert default value into proto's value. - - let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu { - region_id, - reason: format!( - "no protobuf type for default value of column {} ({:?})", - column.column_schema.name, column.column_schema.data_type - ), - })?; + let proto_value = self.column_default_value(column)?; // Insert default value to each row. for row in &mut self.rows.rows { @@ -270,7 +243,7 @@ impl WriteRequest { // Insert column schema. let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| { InvalidRequestSnafu { - region_id, + region_id: self.region_id, reason: format!( "no protobuf type for column {} ({:?})", column.column_schema.name, column.column_schema.data_type @@ -285,6 +258,86 @@ impl WriteRequest { Ok(()) } + + /// Checks whether we should allow a row doesn't provide this column. + fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> { + // For delete request, all tags and timestamp is required. We don't fill default + // tag or timestamp while deleting rows. + ensure!( + self.op_type != OpType::Delete || column.semantic_type == SemanticType::Field, + InvalidRequestSnafu { + region_id: self.region_id, + reason: format!("delete requests need column {}", column.column_schema.name), + } + ); + + // Checks whether they have default value. + ensure!( + column.column_schema.is_nullable() + || column.column_schema.default_constraint().is_some(), + InvalidRequestSnafu { + region_id: self.region_id, + reason: format!("missing column {}", column.column_schema.name), + } + ); + + Ok(()) + } + + /// Returns the default value for specific column. + fn column_default_value(&self, column: &ColumnMetadata) -> Result { + let default_value = match self.op_type { + OpType::Delete => { + ensure!( + column.semantic_type == SemanticType::Field, + InvalidRequestSnafu { + region_id: self.region_id, + reason: format!( + "delete requests need column {}", + column.column_schema.name + ), + } + ); + + // For delete request, we need a default value for padding so we + // can delete a row even a field doesn't have a default value. So the + // value doesn't need to following the default value constraint of the + // column. + if column.column_schema.is_nullable() { + datatypes::value::Value::Null + } else { + column.column_schema.data_type.default_value() + } + } + OpType::Put => { + // For put requests, we use the default value from column schema. + column + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id: self.region_id, + column: &column.column_schema.name, + })? + // This column doesn't have default value. + .with_context(|| InvalidRequestSnafu { + region_id: self.region_id, + reason: format!( + "column {} does not have default value", + column.column_schema.name + ), + })? + } + }; + + // Convert default value into proto's value. + to_proto_value(default_value).with_context(|| InvalidRequestSnafu { + region_id: self.region_id, + reason: format!( + "no protobuf type for default value of column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + }) + } } /// Validate proto value schema. @@ -406,6 +459,7 @@ impl RequestBody { mod tests { use api::v1::{Row, SemanticType}; use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnDefaultConstraint; use store_api::metadata::RegionMetadataBuilder; use super::*; @@ -670,6 +724,116 @@ mod tests { assert_eq!(expect_rows, request.rows); } + fn region_metadata_for_delete() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "f0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 3, + }) + // Column is not nullable. + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "f1", + ConcreteDataType::int64_datatype(), + false, + ) + .with_default_constraint(Some(ColumnDefaultConstraint::Value( + datatypes::value::Value::Int64(100), + ))) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .primary_key(vec![2]); + builder.build().unwrap() + } + + #[test] + fn test_fill_missing_for_delete() { + let rows = Rows { + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], + rows: vec![Row { + values: vec![ts_ms_value(1)], + }], + }; + let metadata = region_metadata_for_delete(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "delete requests need column k0"); + let err = request.fill_missing_columns(&metadata).unwrap_err(); + check_invalid_request(&err, "delete requests need column k0"); + + let rows = Rows { + schema: vec![ + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + ], + rows: vec![Row { + values: vec![i64_value(100), ts_ms_value(1)], + }], + }; + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + request.fill_missing_columns(&metadata).unwrap(); + + let expect_rows = Rows { + schema: vec![ + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field), + new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field), + ], + // Column f1 is not nullable and we use 0 for padding. + rows: vec![Row { + values: vec![ + i64_value(100), + ts_ms_value(1), + Value { value_data: None }, + i64_value(0), + ], + }], + }; + assert_eq!(expect_rows, request.rows); + } + #[test] fn test_no_default() { let rows = Rows {