Skip to content

Commit

Permalink
feat(mito): Support deleting rows in mito2 (#2275)
Browse files Browse the repository at this point in the history
* feat: check delete request

* test: test delete and overwrite
  • Loading branch information
evenyag authored and waynexia committed Sep 12, 2023
1 parent 9691d19 commit b234733
Show file tree
Hide file tree
Showing 2 changed files with 412 additions and 88 deletions.
252 changes: 206 additions & 46 deletions src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ use std::collections::HashMap;

use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows};
use api::v1::{ColumnSchema, Row, Rows, SemanticType};
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest,
RegionPutRequest,
};
use store_api::storage::RegionId;

use super::*;
Expand Down Expand Up @@ -255,6 +258,26 @@ fn build_rows(start: usize, end: usize) -> Vec<Row> {
.collect()
}

fn rows_schema(request: &RegionCreateRequest) -> Vec<ColumnSchema> {
request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>()
}

async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}

#[tokio::test]
async fn test_write_to_region() {
let mut env = TestEnv::with_prefix("write-to-region");
Expand All @@ -263,29 +286,17 @@ async fn test_write_to_region() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let num_rows = 42;
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, num_rows),
rows: build_rows(0, 42),
};
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
put_rows(&engine, region_id, rows).await;
}

#[tokio::test]
Expand All @@ -298,11 +309,7 @@ async fn test_region_replay() {
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
Expand All @@ -312,27 +319,13 @@ async fn test_region_replay() {
schema: column_schemas.clone(),
rows: build_rows(0, 20),
};
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(20, rows_inserted);
put_rows(&engine, region_id, rows).await;

let rows = Rows {
schema: column_schemas,
rows: build_rows(20, 42),
};
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(22, rows_inserted);
put_rows(&engine, region_id, rows).await;

engine.stop().await.unwrap();

Expand Down Expand Up @@ -387,25 +380,189 @@ async fn test_write_query_region() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = request
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

/// Build rows to put for specific `key`.
fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec<Row> {
(start..end)
.enumerate()
.map(|(idx, ts)| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}

/// Build rows to delete for specific `key`.
fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|ts| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}

fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<ColumnSchema> {
request
.column_metadatas
.iter()
.filter(|col| col.semantic_type != SemanticType::Field)
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
.collect::<Vec<_>>()
}

async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}

#[tokio::test]
async fn test_put_delete() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
let delete_schema = delete_rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
// Delete (a, 2)
let rows = Rows {
schema: delete_schema.clone(),
rows: build_delete_rows_for_key("a", 2, 3),
};
delete_rows(&engine, region_id, rows).await;
// Delete (b, 0), (b, 1)
let rows = Rows {
schema: delete_schema,
rows: build_delete_rows_for_key("b", 0, 2),
};
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| b | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_put_overwrite() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
// Put (a, 0) => 5.0, (a, 1) => 6.0
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 5),
};
put_rows(&engine, region_id, rows).await;
// Put (b, 0) => 3.0
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 1, 3),
};
put_rows(&engine, region_id, rows).await;
// Put (b, 2) => 4.0
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("b", 2, 3, 4),
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
Expand All @@ -414,9 +571,12 @@ async fn test_write_query_region() {
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| a | 5.0 | 1970-01-01T00:00:00 |
| a | 6.0 | 1970-01-01T00:00:01 |
| a | 2.0 | 1970-01-01T00:00:02 |
| b | 3.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
| b | 4.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
Loading

0 comments on commit b234733

Please sign in to comment.