Skip to content

Commit

Permalink
fix(mito): Stores and recovers flushed sequence (#2355)
Browse files Browse the repository at this point in the history
* test: add test for reopen

* feat: last entry id starts from flushed entry id

* fix: store flushed sequence and recover it from manifest

* test: check sequence in alter test

* test: more tests for alter
  • Loading branch information
evenyag authored and waynexia committed Sep 12, 2023
1 parent b004c0d commit 6734c68
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 52 deletions.
172 changes: 153 additions & 19 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

use std::collections::HashMap;

use api::v1::{Rows, SemanticType};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand All @@ -27,7 +28,9 @@ use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
use crate::test_util::{
build_rows, build_rows_for_key, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
Expand All @@ -39,6 +42,26 @@ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expect
assert_eq!(expected, batches.pretty_print().unwrap());
}

fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
}
}

#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
Expand All @@ -62,23 +85,7 @@ async fn test_alter_region() {
};
put_rows(&engine, region_id, rows).await;

let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
};
let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
Expand All @@ -93,6 +100,16 @@ async fn test_alter_region() {
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check_after_alter(&engine, region_id, expected).await;
let check_region = |engine: &MitoEngine| {
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
assert_eq!(1, version_data.last_entry_id);
assert_eq!(3, version_data.committed_sequence);
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(3, version_data.version.flushed_sequence);
};
check_region(&engine);

// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
Expand All @@ -108,4 +125,121 @@ async fn test_alter_region() {
.await
.unwrap();
scan_check_after_alter(&engine, region_id, expected).await;
check_region(&engine);
}

/// Build rows with schema (string, f64, ts_millis, string).
fn build_rows_for_tags(
tag0: &str,
tag1: &str,
start: usize,
end: usize,
value_start: usize,
) -> Vec<Row> {
(start..end)
.enumerate()
.map(|(idx, ts)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(tag0.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(tag1.to_string())),
},
],
})
.collect()
}

#[tokio::test]
async fn test_put_after_alter() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let mut column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;

let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();

let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | b | 0.0 | 1970-01-01T00:00:00 |
| | b | 1.0 | 1970-01-01T00:00:01 |
+-------+-------+---------+---------------------+";
scan_check_after_alter(&engine, region_id, expected).await;

// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();

// Put with old schema.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 2, 3, 2),
};
put_rows(&engine, region_id, rows).await;

// Push tag_1 to schema.
column_schemas.push(api::v1::ColumnSchema {
column_name: "tag_1".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
});
// Put with new schema.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_tags("a", "a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;

// Scan again.
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| a | a | 0.0 | 1970-01-01T00:00:00 |
| a | a | 1.0 | 1970-01-01T00:00:01 |
| | b | 0.0 | 1970-01-01T00:00:00 |
| | b | 1.0 | 1970-01-01T00:00:01 |
| | b | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
52 changes: 50 additions & 2 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{FlushListener, StallListener};
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
MockWriteBufferManager, TestEnv,
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};

#[tokio::test]
Expand Down Expand Up @@ -221,3 +221,51 @@ async fn test_flush_empty() {
++";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_flush_reopen_region() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id).await;
let check_region = || {
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
assert_eq!(1, version_data.last_entry_id);
assert_eq!(3, version_data.committed_sequence);
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(3, version_data.version.flushed_sequence);
};
check_region();

reopen_region(&engine, region_id, region_dir).await;
check_region();

// Puts again.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 10),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
assert_eq!(2, version_data.last_entry_id);
assert_eq!(5, version_data.committed_sequence);
}
23 changes: 3 additions & 20 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::collections::HashMap;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_engine_open_empty() {
Expand Down Expand Up @@ -84,23 +84,6 @@ async fn test_engine_reopen_region() {
.await
.unwrap();

// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
reopen_region(&engine, region_id, region_dir).await;
assert!(engine.is_region_exists(region_id));
}
7 changes: 5 additions & 2 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,11 @@ impl RegionFlushTask {
}

/// Converts the flush task into a background job.
///
/// We must call this in the region worker.
fn into_flush_job(mut self, region: &MitoRegionRef) -> Job {
// Get a version of this region before creating a job so we
// always have a consistent memtable list.
// Get a version of this region before creating a job to get current
// wal entry id, sequence and immutable memtables.
let version_data = region.version_control.current();

Box::pin(async move {
Expand All @@ -232,6 +234,7 @@ impl RegionFlushTask {
file_metas,
// The last entry has been flushed.
flushed_entry_id: version_data.last_entry_id,
flushed_sequence: version_data.committed_sequence,
memtables_to_remove,
senders: std::mem::take(&mut self.senders),
file_purger: self.file_purger.clone(),
Expand Down
Loading

0 comments on commit 6734c68

Please sign in to comment.