From ac959e83cbb3cacc27968beb0e86aa6aef226da7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 10:36:58 +0800 Subject: [PATCH 1/5] test: add test for reopen --- src/mito2/src/engine/flush_test.rs | 39 ++++++++++++++++++++++++++++-- src/mito2/src/engine/open_test.rs | 23 +++--------------- src/mito2/src/region/version.rs | 4 +++ src/mito2/src/test_util.rs | 25 ++++++++++++++++++- 4 files changed, 68 insertions(+), 23 deletions(-) diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index cf348a55689e..6363066e5168 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -25,8 +25,8 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::listener::{FlushListener, StallListener}; use crate::test_util::{ - build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, - MockWriteBufferManager, TestEnv, + build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema, + CreateRequestBuilder, MockWriteBufferManager, TestEnv, }; #[tokio::test] @@ -221,3 +221,38 @@ async fn test_flush_empty() { ++"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_flush_reopen_region() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id).await; + let check_region = || { + let region = engine.get_region(region_id).unwrap(); + let version_data = region.version_control.current(); + assert_eq!(3, version_data.committed_sequence); + assert_eq!(1, version_data.last_entry_id); + assert_eq!(1, version_data.version.flushed_entry_id); + }; + check_region(); + + reopen_region(&engine, region_id, region_dir).await; + check_region(); +} diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 5d2e53c21004..e76a8c7b1ebd 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -17,11 +17,11 @@ use std::collections::HashMap; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_engine_open_empty() { @@ -84,23 +84,6 @@ async fn test_engine_reopen_region() { .await .unwrap(); - // Close the region. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); - - // Open the region again. - engine - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: String::new(), - region_dir, - options: HashMap::default(), - }), - ) - .await - .unwrap(); + reopen_region(&engine, region_id, region_dir).await; assert!(engine.is_region_exists(region_id)); } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 881d79268717..a0d71a080348 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -145,8 +145,12 @@ pub(crate) struct VersionControlData { /// Latest version. pub(crate) version: VersionRef, /// Sequence number of last committed data. + /// + /// Starts from 1 (zero means no data). pub(crate) committed_sequence: SequenceNumber, /// Last WAL entry Id. + /// + /// Starts from 1 (zero means no data). pub(crate) last_entry_id: EntryId, /// Marker of whether this region is dropped/dropping pub(crate) is_dropped: bool, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 0c02fbc4f4e4..6e55d7f22860 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -35,7 +35,8 @@ use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionPutRequest, RegionRequest, + RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, + RegionOpenRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::RegionId; @@ -566,3 +567,25 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) { }; assert_eq!(0, rows); } + +/// Reopen a region. +pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir: String) { + // Close the region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // Open the region again. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); +} From c61149780072716bf246b78ec5cbab9416a953eb Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 10:51:30 +0800 Subject: [PATCH 2/5] feat: last entry id starts from flushed entry id --- src/mito2/src/engine/flush_test.rs | 2 +- src/mito2/src/region/opener.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 6363066e5168..92822575405c 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -247,8 +247,8 @@ async fn test_flush_reopen_region() { let check_region = || { let region = engine.get_region(region_id).unwrap(); let version_data = region.version_control.current(); - assert_eq!(3, version_data.committed_sequence); assert_eq!(1, version_data.last_entry_id); + assert_eq!(3, version_data.committed_sequence); assert_eq!(1, version_data.version.flushed_entry_id); }; check_region(); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 8f306d0e9b8f..01aee68565a3 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -177,7 +177,9 @@ async fn replay_memtable( version_control: &VersionControlRef, ) -> Result<()> { let mut rows_replayed = 0; - let mut last_entry_id = EntryId::MIN; + // Last entry id should start from flushed entry id since there might be no + // data in the WAL. + let mut last_entry_id = flushed_entry_id; let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control); let mut wal_stream = wal.scan(region_id, flushed_entry_id)?; while let Some(res) = wal_stream.next().await { From c9f7680762cf9eb1f734b00a58a81ef26d7547a4 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 15:00:04 +0800 Subject: [PATCH 3/5] fix: store flushed sequence and recover it from manifest --- src/mito2/src/flush.rs | 7 ++++-- src/mito2/src/manifest/action.rs | 12 +++++++++- src/mito2/src/manifest/tests/checkpoint.rs | 4 +++- src/mito2/src/region/opener.rs | 1 + src/mito2/src/region/version.rs | 26 ++++++++++++++++++---- src/mito2/src/request.rs | 4 +++- src/mito2/src/worker/handle_compaction.rs | 1 + src/mito2/src/worker/handle_flush.rs | 1 + 8 files changed, 47 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ea885bad0ea2..715021590c74 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -206,9 +206,11 @@ impl RegionFlushTask { } /// Converts the flush task into a background job. + /// + /// We must call this in the region worker. fn into_flush_job(mut self, region: &MitoRegionRef) -> Job { - // Get a version of this region before creating a job so we - // always have a consistent memtable list. + // Get a version of this region before creating a job to get current + // wal entry id, sequence and immutable memtables. let version_data = region.version_control.current(); Box::pin(async move { @@ -232,6 +234,7 @@ impl RegionFlushTask { file_metas, // The last entry has been flushed. flushed_entry_id: version_data.last_entry_id, + flushed_sequence: version_data.committed_sequence, memtables_to_remove, senders: std::mem::take(&mut self.senders), file_purger: self.file_purger.clone(), diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 6f82e581726d..812082bb44d2 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; use crate::sst::file::{FileId, FileMeta}; @@ -49,6 +49,7 @@ pub struct RegionEdit { pub files_to_remove: Vec, pub compaction_time_window: Option, pub flushed_entry_id: Option, + pub flushed_sequence: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -65,6 +66,8 @@ pub struct RegionManifest { pub files: HashMap, /// Last WAL entry id of flushed data. pub flushed_entry_id: EntryId, + /// Last sequence of flushed data. + pub flushed_sequence: SequenceNumber, /// Current manifest version. pub manifest_version: ManifestVersion, } @@ -74,6 +77,7 @@ pub struct RegionManifestBuilder { metadata: Option, files: HashMap, flushed_entry_id: EntryId, + flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, } @@ -86,6 +90,7 @@ impl RegionManifestBuilder { files: s.files, flushed_entry_id: s.flushed_entry_id, manifest_version: s.manifest_version, + flushed_sequence: s.flushed_sequence, } } else { Default::default() @@ -108,6 +113,9 @@ impl RegionManifestBuilder { if let Some(flushed_entry_id) = edit.flushed_entry_id { self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id); } + if let Some(flushed_sequence) = edit.flushed_sequence { + self.flushed_sequence = self.flushed_sequence.max(flushed_sequence); + } } /// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata). @@ -121,6 +129,7 @@ impl RegionManifestBuilder { metadata, files: self.files, flushed_entry_id: self.flushed_entry_id, + flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, }) } @@ -216,6 +225,7 @@ mod tests { let region_edit = r#"{ "flushed_entry_id":10, + "flushed_sequence":10, "compaction_time_window":null, "files_to_add":[ {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100} diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index a1a8c09338ae..9dd635b8a73a 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -58,6 +58,7 @@ fn nop_action() -> RegionMetaActionList { files_to_remove: vec![], compaction_time_window: None, flushed_entry_id: None, + flushed_sequence: None, })]) } @@ -149,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":769,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":790,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager @@ -176,6 +177,7 @@ async fn checkpoint_with_different_compression_types() { files_to_remove: vec![], compaction_time_window: None, flushed_entry_id: None, + flushed_sequence: None, })]); actions.push(action); } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 01aee68565a3..2d7c6c3cc9e3 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -152,6 +152,7 @@ impl RegionOpener { let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) + .flushed_sequence(manifest.flushed_sequence) .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index a0d71a080348..502087f3eb15 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -48,11 +48,14 @@ pub(crate) struct VersionControl { impl VersionControl { /// Returns a new [VersionControl] with specific `version`. pub(crate) fn new(version: Version) -> VersionControl { + // Initialize sequence and entry id from flushed sequence and entry id. + let (flushed_sequence, flushed_entry_id) = + (version.flushed_sequence, version.flushed_entry_id); VersionControl { data: RwLock::new(VersionControlData { version: Arc::new(version), - committed_sequence: 0, - last_entry_id: 0, + committed_sequence: flushed_sequence, + last_entry_id: flushed_entry_id, is_dropped: false, }), } @@ -172,6 +175,8 @@ pub(crate) struct Version { pub(crate) ssts: SstVersionRef, /// Inclusive max WAL entry id of flushed data. pub(crate) flushed_entry_id: EntryId, + /// Inclusive max sequence of flushed data. + pub(crate) flushed_sequence: SequenceNumber, // TODO(yingwen): RegionOptions. } @@ -183,6 +188,7 @@ pub(crate) struct VersionBuilder { memtables: MemtableVersionRef, ssts: SstVersionRef, flushed_entry_id: EntryId, + flushed_sequence: SequenceNumber, } impl VersionBuilder { @@ -193,6 +199,7 @@ impl VersionBuilder { memtables: Arc::new(MemtableVersion::new(mutable)), ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, + flushed_sequence: 0, } } @@ -203,6 +210,7 @@ impl VersionBuilder { memtables: version.memtables.clone(), ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, + flushed_sequence: version.flushed_sequence, } } @@ -224,10 +232,19 @@ impl VersionBuilder { self } + /// Sets flushed sequence. + pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self { + self.flushed_sequence = sequence; + self + } + /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { - if let Some(flushed_entry_id) = edit.flushed_entry_id { - self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id); + if let Some(entry_id) = edit.flushed_entry_id { + self.flushed_entry_id = self.flushed_entry_id.max(entry_id); + } + if let Some(sequence) = edit.flushed_sequence { + self.flushed_sequence = self.flushed_sequence.max(sequence); } if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() { let mut ssts = (*self.ssts).clone(); @@ -269,6 +286,7 @@ impl VersionBuilder { memtables: self.memtables, ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, + flushed_sequence: self.flushed_sequence, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index e90ba209159b..c9b7f4c74606 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -37,7 +37,7 @@ use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, }; -use store_api::storage::{CompactionStrategy, RegionId}; +use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; @@ -525,6 +525,8 @@ pub(crate) struct FlushFinished { pub(crate) file_metas: Vec, /// Entry id of flushed data. pub(crate) flushed_entry_id: EntryId, + /// Sequence of flushed data. + pub(crate) flushed_sequence: SequenceNumber, /// Id of memtables to remove. pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, /// Flush result senders. diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index ae83a6620a07..9cecccb41425 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -65,6 +65,7 @@ impl RegionWorkerLoop { files_to_remove: std::mem::take(&mut request.compacted_files), compaction_time_window: None, // TODO(hl): update window maybe flushed_entry_id: None, + flushed_sequence: None, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); if let Err(e) = region.manifest_manager.update(action_list).await { diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index d7db92847caa..2375d8823f20 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -47,6 +47,7 @@ impl RegionWorkerLoop { files_to_remove: Vec::new(), compaction_time_window: None, flushed_entry_id: Some(request.flushed_entry_id), + flushed_sequence: Some(request.flushed_sequence), }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); if let Err(e) = region.manifest_manager.update(action_list).await { From ffb8ff8739197179207ed246b1b8f0be598cd9cd Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 17:01:33 +0800 Subject: [PATCH 4/5] test: check sequence in alter test --- src/mito2/src/engine/alter_test.rs | 11 +++++++++++ src/mito2/src/engine/flush_test.rs | 13 +++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index af01cdc4a876..8e91e6a63c74 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -93,6 +93,16 @@ async fn test_alter_region() { | | 2 | 2.0 | 1970-01-01T00:00:02 | +-------+-------+---------+---------------------+"; scan_check_after_alter(&engine, region_id, expected).await; + let check_region = |engine: &MitoEngine| { + let region = engine.get_region(region_id).unwrap(); + let version_data = region.version_control.current(); + assert_eq!(1, version_data.last_entry_id); + assert_eq!(3, version_data.committed_sequence); + assert_eq!(1, version_data.version.flushed_entry_id); + assert_eq!(1, version_data.version.flushed_entry_id); + assert_eq!(3, version_data.version.flushed_sequence); + }; + check_region(&engine); // Reopen region. let engine = env.reopen_engine(engine, MitoConfig::default()).await; @@ -108,4 +118,5 @@ async fn test_alter_region() { .await .unwrap(); scan_check_after_alter(&engine, region_id, expected).await; + check_region(&engine); } diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 92822575405c..2c76752bb5e1 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -250,9 +250,22 @@ async fn test_flush_reopen_region() { assert_eq!(1, version_data.last_entry_id); assert_eq!(3, version_data.committed_sequence); assert_eq!(1, version_data.version.flushed_entry_id); + assert_eq!(1, version_data.version.flushed_entry_id); + assert_eq!(3, version_data.version.flushed_sequence); }; check_region(); reopen_region(&engine, region_id, region_dir).await; check_region(); + + // Puts again. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 10), + }; + put_rows(&engine, region_id, rows).await; + let region = engine.get_region(region_id).unwrap(); + let version_data = region.version_control.current(); + assert_eq!(2, version_data.last_entry_id); + assert_eq!(5, version_data.committed_sequence); } From 7c485bd2c42aacc5e8829531d4fa8e732cd855e9 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 17:29:57 +0800 Subject: [PATCH 5/5] test: more tests for alter --- src/mito2/src/engine/alter_test.rs | 161 +++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 8e91e6a63c74..26faadd293e3 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -14,7 +14,8 @@ use std::collections::HashMap; -use api::v1::{Rows, SemanticType}; +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, Row, Rows, SemanticType}; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -27,7 +28,9 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::MitoEngine; -use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; +use crate::test_util::{ + build_rows, build_rows_for_key, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { let request = ScanRequest::default(); @@ -39,6 +42,26 @@ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expect assert_eq!(expected, batches.pretty_print().unwrap()); } +fn add_tag1() -> RegionAlterRequest { + RegionAlterRequest { + schema_version: 0, + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 3, + }, + location: Some(AddColumnLocation::First), + }], + }, + } +} + #[tokio::test] async fn test_alter_region() { common_telemetry::init_default_ut_logging(); @@ -62,23 +85,7 @@ async fn test_alter_region() { }; put_rows(&engine, region_id, rows).await; - let request = RegionAlterRequest { - schema_version: 0, - kind: AlterKind::AddColumns { - columns: vec![AddColumn { - column_metadata: ColumnMetadata { - column_schema: ColumnSchema::new( - "tag_1", - ConcreteDataType::string_datatype(), - true, - ), - semantic_type: SemanticType::Tag, - column_id: 3, - }, - location: Some(AddColumnLocation::First), - }], - }, - }; + let request = add_tag1(); engine .handle_request(region_id, RegionRequest::Alter(request)) .await @@ -120,3 +127,119 @@ async fn test_alter_region() { scan_check_after_alter(&engine, region_id, expected).await; check_region(&engine); } + +/// Build rows with schema (string, f64, ts_millis, string). +fn build_rows_for_tags( + tag0: &str, + tag1: &str, + start: usize, + end: usize, + value_start: usize, +) -> Vec { + (start..end) + .enumerate() + .map(|(idx, ts)| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(tag0.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value((value_start + idx) as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(tag1.to_string())), + }, + ], + }) + .collect() +} + +#[tokio::test] +async fn test_put_after_alter() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let mut column_schemas = rows_schema(&request); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + let request = add_tag1(); + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | b | 0.0 | 1970-01-01T00:00:00 | +| | b | 1.0 | 1970-01-01T00:00:01 | ++-------+-------+---------+---------------------+"; + scan_check_after_alter(&engine, region_id, expected).await; + + // Reopen region. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + + // Put with old schema. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 2, 3, 2), + }; + put_rows(&engine, region_id, rows).await; + + // Push tag_1 to schema. + column_schemas.push(api::v1::ColumnSchema { + column_name: "tag_1".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }); + // Put with new schema. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_tags("a", "a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + // Scan again. + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| a | a | 0.0 | 1970-01-01T00:00:00 | +| a | a | 1.0 | 1970-01-01T00:00:01 | +| | b | 0.0 | 1970-01-01T00:00:00 | +| | b | 1.0 | 1970-01-01T00:00:01 | +| | b | 2.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +}