From 5bfc56ab14563602e8ae37fe2439615a356e603a Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 11 Sep 2023 22:08:23 +0800 Subject: [PATCH] feat(mito): Implement skeleton for alteration (#2343) * feat: impl handle_alter wip * refactor: move send_result to worker.rs * feat: skeleton for handle_alter_request * feat: write requests should wait for alteration * feat: define alter request * chore: no warnings * fix: remove memtables after flush * chore: update comments and impl add_write_request_to_pending * feat: add schema version to RegionMetadata * feat: impl alter_schema/can_alter_directly * chore: use send_result * test: pull next_batch again * feat: convert pb AlterRequest to RegionAlterRequest * feat: validate alter request * feat: validate request and alter metadata * feat: allow none location * test: test alter * fix: recover files and flushed entry id from manifest * test: test alter * chore: change comments and variables * chore: fix compiler errors * feat: add is_empty() to MemtableVersion * test: fix metadata alter test * fix: Compaction picker doesn't notify waiters if it returns None * chore: address CR comments * test: add tests for alter request * refactor: use send_result --- src/mito2/src/compaction/twcs.rs | 20 +- src/mito2/src/engine.rs | 2 + src/mito2/src/engine/alter_test.rs | 111 ++++ src/mito2/src/engine/compaction_test.rs | 13 +- src/mito2/src/engine/flush_test.rs | 6 +- src/mito2/src/error.rs | 7 + src/mito2/src/flush.rs | 76 ++- src/mito2/src/manifest/action.rs | 4 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/memtable/version.rs | 22 +- src/mito2/src/read/scan_region.rs | 12 + src/mito2/src/read/seq_scan.rs | 5 + src/mito2/src/region/opener.rs | 16 +- src/mito2/src/region/version.rs | 78 ++- src/mito2/src/test_util.rs | 21 +- src/mito2/src/worker.rs | 18 +- src/mito2/src/worker/handle_alter.rs | 139 +++++ src/mito2/src/worker/handle_compaction.rs | 8 +- src/mito2/src/worker/handle_flush.rs | 30 +- src/mito2/src/worker/handle_write.rs | 21 +- src/store-api/src/metadata.rs | 206 +++++++- src/store-api/src/region_request.rs | 572 +++++++++++++++++++-- 22 files changed, 1239 insertions(+), 150 deletions(-) create mode 100644 src/mito2/src/engine/alter_test.rs create mode 100644 src/mito2/src/worker/handle_alter.rs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index a3e1ca7cd34a..c0098319e2aa 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -39,6 +39,7 @@ use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, Wor use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::LevelMeta; +use crate::worker::send_result; const MAX_PARALLEL_COMPACTION: usize = 8; @@ -122,7 +123,7 @@ impl Picker for TwcsPicker { ttl, compaction_time_window, request_sender, - waiter: waiters, + waiter, file_purger, } = req; @@ -155,6 +156,8 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { + // Nothing to compact. + send_result(waiter, Ok(Output::AffectedRows(0))); return None; } let task = TwcsCompactionTask { @@ -166,7 +169,7 @@ impl Picker for TwcsPicker { sst_write_buffer_size: ReadableSize::mb(4), compaction_time_window: None, request_sender, - sender: waiters, + sender: waiter, file_purger, }; Some(Box::new(task)) @@ -267,7 +270,8 @@ impl TwcsCompactionTask { compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); info!( - "Compaction output [{}]-> {}", + "Compaction region {} output [{}]-> {}", + self.region_id, output .inputs .iter() @@ -315,15 +319,6 @@ impl TwcsCompactionTask { Ok((output, compacted)) } - /// Handles compaction success. - fn early_success(&mut self) { - if let Some(sender) = self.sender.take() { - let _ = sender.send(Ok(Output::AffectedRows(0)).context(CompactRegionSnafu { - region_id: self.region_id, - })); - } - } - /// Handles compaction failure, notifies all waiters. fn on_failure(&mut self, err: Arc) { if let Some(sender) = self.sender.take() { @@ -353,7 +348,6 @@ impl CompactionTask for TwcsCompactionTask { "Compacted SST files, input: {:?}, output: {:?}, window: {:?}", added, deleted, self.compaction_time_window ); - self.early_success(); BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.region_id, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d2266be28938..aed6f399a524 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -14,6 +14,8 @@ //! Mito region engine. +#[cfg(test)] +mod alter_test; #[cfg(test)] mod close_test; #[cfg(test)] diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs new file mode 100644 index 000000000000..af01cdc4a876 --- /dev/null +++ b/src/mito2/src/engine/alter_test.rs @@ -0,0 +1,111 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::{Rows, SemanticType}; +use common_recordbatch::RecordBatches; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest, +}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; + +async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_alter_region() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let request = RegionAlterRequest { + schema_version: 0, + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 3, + }, + location: Some(AddColumnLocation::First), + }], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | 0 | 0.0 | 1970-01-01T00:00:00 | +| | 1 | 1.0 | 1970-01-01T00:00:01 | +| | 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------------------+"; + scan_check_after_alter(&engine, region_id, expected).await; + + // Reopen region. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + scan_check_after_alter(&engine, region_id, expected).await; +} diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 584d21e7b544..9c196d78b900 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::ops::Range; use api::v1::{ColumnSchema, Rows}; @@ -29,7 +28,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::test_util::{ - build_rows, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv, + build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv, }; async fn put_and_flush( @@ -40,7 +39,7 @@ async fn put_and_flush( ) { let rows = Rows { schema: column_schemas.to_vec(), - rows: build_rows(rows.start, rows.end), + rows: build_rows_for_key("a", rows.start, rows.end, 0), }; put_rows(engine, region_id, rows).await; @@ -63,7 +62,7 @@ async fn delete_and_flush( let row_cnt = rows.len(); let rows = Rows { schema: column_schemas.to_vec(), - rows: build_rows(rows.start, rows.end), + rows: build_rows_for_key("a", rows.start, rows.end, 0), }; let deleted = engine @@ -89,8 +88,8 @@ async fn delete_and_flush( assert_eq!(0, rows); } -async fn collect_stream_ts(stream: SendableRecordBatchStream) -> HashSet { - let mut res = HashSet::new(); +async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { + let mut res = Vec::new(); let batches = RecordBatches::try_collect(stream).await.unwrap(); for batch in batches { let ts_col = batch @@ -139,5 +138,5 @@ async fn test_compaction_region() { .unwrap(); let vec = collect_stream_ts(stream).await; - assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); + assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); } diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index c2a04a971fa6..cf348a55689e 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -53,6 +53,7 @@ async fn test_manual_flush() { let request = ScanRequest::default(); let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -98,7 +99,7 @@ async fn test_flush_engine() { write_buffer_manager.set_should_flush(true); - // Writes and triggers flush. + // Writes to the mutable memtable and triggers flush. let rows = Rows { schema: column_schemas.clone(), rows: build_rows_for_key("b", 0, 2, 0), @@ -110,6 +111,7 @@ async fn test_flush_engine() { let request = ScanRequest::default(); let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -174,6 +176,7 @@ async fn test_write_stall() { let request = ScanRequest::default(); let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -209,6 +212,7 @@ async fn test_flush_empty() { let request = ScanRequest::default(); let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); assert_eq!(0, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index bd17311f429d..07630e3eb790 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -468,6 +468,12 @@ pub enum Error { reason: String, location: Location, }, + + #[snafu(display("{}, location: {}", source, location))] + InvalidRegionRequest { + source: store_api::metadata::MetadataError, + location: Location, + }, } pub type Result = std::result::Result; @@ -537,6 +543,7 @@ impl ErrorExt for Error { RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, + InvalidRegionRequest { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 11f08b5d5f6e..ea885bad0ea2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -31,12 +31,14 @@ use crate::read::Source; use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; use crate::request::{ - BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, WorkerRequest, + BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, SenderWriteRequest, + WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; +use crate::worker::send_result; /// Global write buffer (memtable) manager. /// @@ -165,7 +167,8 @@ pub enum FlushReason { EngineFull, /// Manual flush. Manual, - // TODO(yingwen): Alter. + /// Flush to alter table. + Alter, } /// Task to flush a region. @@ -357,13 +360,13 @@ impl FlushScheduler { // Checks whether we can flush the region now. if flush_status.flushing { // There is already a flush job running. - flush_status.push_task(task); + flush_status.merge_task(task); return Ok(()); } // If there are pending tasks, then we should push it to pending list. if flush_status.pending_task.is_some() { - flush_status.push_task(task); + flush_status.merge_task(task); return Ok(()); } @@ -393,7 +396,7 @@ impl FlushScheduler { pub(crate) fn on_flush_success( &mut self, region_id: RegionId, - ) -> Option> { + ) -> Option<(Vec, Vec)> { let Some(flush_status) = self.region_status.get_mut(®ion_id) else { return None; }; @@ -401,11 +404,11 @@ impl FlushScheduler { // This region doesn't have running flush job. flush_status.flushing = false; - let pending_ddls = if flush_status.pending_task.is_none() { + let pending_requests = if flush_status.pending_task.is_none() { // The region doesn't have any pending flush task. // Safety: The flush status exists. let flush_status = self.region_status.remove(®ion_id).unwrap(); - Some(flush_status.pending_ddls) + Some((flush_status.pending_ddls, flush_status.pending_writes)) } else { None }; @@ -415,7 +418,7 @@ impl FlushScheduler { error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id); } - pending_ddls + pending_requests } /// Notifies the scheduler that the flush job is finished. @@ -460,17 +463,31 @@ impl FlushScheduler { /// Add ddl request to pending queue. /// - /// Returns error if region doesn't request flush. - pub(crate) fn add_ddl_request_to_pending( - &mut self, - request: SenderDdlRequest, - ) -> Result<(), SenderDdlRequest> { - if let Some(status) = self.region_status.get_mut(&request.region_id) { - status.pending_ddls.push(request); - return Ok(()); - } + /// # Panics + /// Panics if region didn't request flush. + pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) { + let status = self.region_status.get_mut(&request.region_id).unwrap(); + status.pending_ddls.push(request); + } - Err(request) + /// Add write request to pending queue. + /// + /// # Panics + /// Panics if region didn't request flush. + pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) { + let status = self + .region_status + .get_mut(&request.request.region_id) + .unwrap(); + status.pending_writes.push(request); + } + + /// Returns true if the region has pending DDLs. + pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool { + self.region_status + .get(®ion_id) + .map(|status| !status.pending_ddls.is_empty()) + .unwrap_or(false) } /// Schedules a new flush task when the scheduler can submit next task. @@ -508,6 +525,8 @@ struct FlushStatus { pending_task: Option, /// Pending ddl requests. pending_ddls: Vec, + /// Requests waiting to write after altering the region. + pending_writes: Vec, } impl FlushStatus { @@ -517,10 +536,12 @@ impl FlushStatus { flushing: false, pending_task: None, pending_ddls: Vec::new(), + pending_writes: Vec::new(), } } - fn push_task(&mut self, task: RegionFlushTask) { + /// Merges the task to pending task. + fn merge_task(&mut self, task: RegionFlushTask) { if let Some(pending) = &mut self.pending_task { pending.merge(task); } else { @@ -533,11 +554,20 @@ impl FlushStatus { task.on_failure(err.clone()); } for ddl in self.pending_ddls { - if let Some(sender) = ddl.sender { - let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + send_result( + ddl.sender, + Err(err.clone()).context(FlushRegionSnafu { region_id: self.region.region_id, - })); - } + }), + ); + } + for write_req in self.pending_writes { + send_result( + write_req.sender, + Err(err.clone()).context(FlushRegionSnafu { + region_id: self.region.region_id, + }), + ); } } } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 0e24e8fd8790..6f82e581726d 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -232,7 +232,9 @@ mod tests { {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3} ], "primary_key":[1], - "region_id":5299989648942} + "region_id":5299989648942, + "schema_version":0 + } }"#; let _ = serde_json::from_str::(region_change).unwrap(); diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index d06b5fb2a667..a1a8c09338ae 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -149,7 +149,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":750,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":769,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 19a9da4a9dcf..f512111822ae 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use smallvec::SmallVec; -use crate::memtable::MemtableRef; +use crate::memtable::{MemtableId, MemtableRef}; /// A version of current memtables in a region. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct MemtableVersion { /// Mutable memtable. pub(crate) mutable: MemtableRef, @@ -85,8 +85,26 @@ impl MemtableVersion { }) } + /// Removes memtables by ids from immutable memtables. + pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) { + self.immutables = self + .immutables + .iter() + .filter(|mem| !ids.contains(&mem.id())) + .cloned() + .collect(); + } + /// Returns the memory usage of the mutable memtable. pub(crate) fn mutable_usage(&self) -> usize { self.mutable.stats().estimated_bytes } + + /// Returns true if the memtable version is empty. + /// + /// The version is empty when mutable memtable is empty and there is no + /// immutable memtables. + pub(crate) fn is_empty(&self) -> bool { + self.mutable.is_empty() && self.immutables.is_empty() + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 6c743675ba1f..752821834e94 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -52,6 +52,13 @@ impl Scanner { Scanner::Seq(seq_scan) => seq_scan.num_files(), } } + + /// Returns number of memtables to scan. + pub(crate) fn num_memtables(&self) -> usize { + match self { + Scanner::Seq(seq_scan) => seq_scan.num_memtables(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -139,6 +146,11 @@ impl ScanRegion { } let memtables = self.version.memtables.list_memtables(); + // Skip empty memtables. + let memtables: Vec<_> = memtables + .into_iter() + .filter(|mem| !mem.is_empty()) + .collect(); debug!( "Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}", diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9e34bee476a6..28e5249fecc8 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -147,6 +147,11 @@ impl SeqScan { #[cfg(test)] impl SeqScan { + /// Returns number of memtables to scan. + pub(crate) fn num_memtables(&self) -> usize { + self.memtables.len() + } + /// Returns number of SST files to scan. pub(crate) fn num_files(&self) -> usize { self.files.len() diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 7fc33f759135..8f306d0e9b8f 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -146,19 +146,23 @@ impl RegionOpener { ); let region_id = metadata.region_id; + let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); + let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone())); let mutable = self.memtable_builder.build(&metadata); - let version = VersionBuilder::new(metadata, mutable).build(); - let flushed_sequence = version.flushed_entry_id; + let version = VersionBuilder::new(metadata, mutable) + .add_files(file_purger.clone(), manifest.files.values().cloned()) + .flushed_entry_id(manifest.flushed_entry_id) + .build(); + let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); - replay_memtable(wal, region_id, flushed_sequence, &version_control).await?; - let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); + replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?; let region = MitoRegion { region_id: self.region_id, version_control, - access_layer: access_layer.clone(), + access_layer, manifest_manager, - file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), + file_purger, last_flush_millis: AtomicI64::new(current_time_millis()), }; Ok(region) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index fc6b281216b4..881d79268717 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -30,7 +30,8 @@ use store_api::storage::SequenceNumber; use crate::manifest::action::RegionEdit; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; -use crate::memtable::{MemtableBuilderRef, MemtableRef}; +use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::sst::file::FileMeta; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::{SstVersion, SstVersionRef}; use crate::wal::EntryId; @@ -90,11 +91,17 @@ impl VersionControl { } /// Apply edit to current version. - pub(crate) fn apply_edit(&self, edit: RegionEdit, purger: FilePurgerRef) { + pub(crate) fn apply_edit( + &self, + edit: RegionEdit, + memtables_to_remove: &[MemtableId], + purger: FilePurgerRef, + ) { let version = self.current().version; let new_version = Arc::new( VersionBuilder::from_version(version) .apply_edit(edit, purger) + .remove_memtables(memtables_to_remove) .build(), ); @@ -108,6 +115,26 @@ impl VersionControl { data.is_dropped = true; data.version.ssts.mark_all_deleted(); } + + /// Alter schema of the region. + /// + /// It replaces existing mutable memtable with a memtable that uses the + /// new schema. Memtables of the version must be empty. + pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) { + let new_mutable = builder.build(&metadata); + let version = self.current().version; + debug_assert!(version.memtables.mutable.is_empty()); + debug_assert!(version.memtables.immutables().is_empty()); + let new_version = Arc::new( + VersionBuilder::from_version(version) + .metadata(metadata) + .memtables(MemtableVersion::new(new_mutable)) + .build(), + ); + + let mut version_data = self.data.write().unwrap(); + version_data.version = new_version; + } } pub(crate) type VersionControlRef = Arc; @@ -156,7 +183,7 @@ pub(crate) struct VersionBuilder { impl VersionBuilder { /// Returns a new builder. - pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> VersionBuilder { + pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self { VersionBuilder { metadata, memtables: Arc::new(MemtableVersion::new(mutable)), @@ -166,7 +193,7 @@ impl VersionBuilder { } /// Returns a new builder from an existing version. - pub(crate) fn from_version(version: VersionRef) -> VersionBuilder { + pub(crate) fn from_version(version: VersionRef) -> Self { VersionBuilder { metadata: version.metadata.clone(), memtables: version.memtables.clone(), @@ -176,17 +203,25 @@ impl VersionBuilder { } /// Sets memtables. - pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> VersionBuilder { + pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self { self.memtables = Arc::new(memtables); self } + /// Sets metadata. + pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self { + self.metadata = metadata; + self + } + + /// Sets flushed entry id. + pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self { + self.flushed_entry_id = entry_id; + self + } + /// Apply edit to the builder. - pub(crate) fn apply_edit( - mut self, - edit: RegionEdit, - file_purger: FilePurgerRef, - ) -> VersionBuilder { + pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(flushed_entry_id) = edit.flushed_entry_id { self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id); } @@ -200,6 +235,29 @@ impl VersionBuilder { self } + /// Remove memtables from the builder. + pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self { + if !ids.is_empty() { + let mut memtables = (*self.memtables).clone(); + memtables.remove_memtables(ids); + self.memtables = Arc::new(memtables); + } + self + } + + /// Add files to the builder. + pub(crate) fn add_files( + mut self, + file_purger: FilePurgerRef, + files: impl Iterator, + ) -> Self { + let mut ssts = (*self.ssts).clone(); + ssts.add_files(file_purger, files); + self.ssts = Arc::new(ssts); + + self + } + /// Builds a new [Version] from the builder. pub(crate) fn build(self) -> Version { Version { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index cbc2e8896ed2..0c02fbc4f4e4 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -137,25 +137,6 @@ impl TestEnv { ) } - /// Reopen the engine. - pub async fn reopen_engine_with( - &self, - engine: MitoEngine, - config: MitoConfig, - manager: WriteBufferManagerRef, - listener: Option, - ) -> MitoEngine { - engine.stop().await.unwrap(); - - MitoEngine::new_for_test( - config, - self.logstore.clone().unwrap(), - self.object_store.clone().unwrap(), - manager, - listener, - ) - } - /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store) = self.create_log_and_object_store().await; @@ -404,6 +385,8 @@ pub async fn check_reader_result(reader: &mut R, expect: &[Batch } assert_eq!(expect, result); + // Next call to `next_batch()` still returns None. + assert!(reader.next_batch().await.unwrap().is_none()); } /// A mock [WriteBufferManager] that supports controlling whether to flush/stall. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3c9f4e7803ce..c8ed7dcfbb94 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -14,6 +14,7 @@ //! Structs and utilities for writing regions. +mod handle_alter; mod handle_close; mod handle_compaction; mod handle_create; @@ -27,6 +28,7 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; @@ -35,7 +37,7 @@ use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; @@ -175,6 +177,14 @@ impl WorkerGroup { } } +/// Send result to the sender. +pub(crate) fn send_result(sender: Option>>, res: Result) { + if let Some(sender) = sender { + // Ignore send result. + let _ = sender.send(res); + } +} + // Tests methods. #[cfg(test)] impl WorkerGroup { @@ -489,7 +499,11 @@ impl RegionWorkerLoop { DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await, DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, - DdlRequest::Alter(_) => todo!(), + DdlRequest::Alter(req) => { + self.handle_alter_request(ddl.region_id, req, ddl.sender) + .await; + continue; + } DdlRequest::Flush(_) => { self.handle_flush_request(ddl.region_id, ddl.sender).await; continue; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs new file mode 100644 index 000000000000..49c93a970504 --- /dev/null +++ b/src/mito2/src/worker/handle_alter.rs @@ -0,0 +1,139 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling alter related requests. + +use std::sync::Arc; + +use common_query::Output; +use common_telemetry::{error, info}; +use snafu::ResultExt; +use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::region_request::RegionAlterRequest; +use store_api::storage::RegionId; +use tokio::sync::oneshot; + +use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result}; +use crate::flush::FlushReason; +use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; +use crate::memtable::MemtableBuilderRef; +use crate::region::version::Version; +use crate::region::MitoRegionRef; +use crate::request::{DdlRequest, SenderDdlRequest}; +use crate::worker::{send_result, RegionWorkerLoop}; + +impl RegionWorkerLoop { + pub(crate) async fn handle_alter_request( + &mut self, + region_id: RegionId, + request: RegionAlterRequest, + sender: Option>>, + ) { + let Some(region) = self.regions.get_region(region_id) else { + send_result(sender, RegionNotFoundSnafu { region_id }.fail()); + return; + }; + + info!("Try to alter region: {}, request: {:?}", region_id, request); + + // Get the version before alter. + let version = region.version(); + // Checks whether we can alter the region directly. + if !version.memtables.is_empty() { + // If memtable is not empty, we can't alter it directly and need to flush + // all memtables first. + info!("Flush region: {} before alteration", region_id); + + // Try to submit a flush task. + let task = self.new_flush_task(®ion, FlushReason::Alter); + if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + // Unable to flush the region, send error to waiter. + send_result(sender, Err(e)); + return; + } + + // Safety: We have requested flush. + self.flush_scheduler + .add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender, + request: DdlRequest::Alter(request), + }); + + return; + } + + // Now we can alter the region directly. + if let Err(e) = + alter_region_schema(®ion, &version, request, &self.memtable_builder).await + { + error!(e; "Failed to alter region schema, region_id: {}", region_id); + send_result(sender, Err(e)); + return; + } + + info!( + "Schema of region {} is altered from {} to {}", + region_id, + version.metadata.schema_version, + region.metadata().schema_version + ); + + // Notifies waiters. + send_result(sender, Ok(Output::AffectedRows(0))); + } +} + +/// Alter the schema of the region. +async fn alter_region_schema( + region: &MitoRegionRef, + version: &Version, + request: RegionAlterRequest, + builder: &MemtableBuilderRef, +) -> Result<()> { + let new_meta = metadata_after_alteration(&version.metadata, request)?; + // Persist the metadata to region's manifest. + let change = RegionChange { + metadata: new_meta.clone(), + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); + region.manifest_manager.update(action_list).await?; + + // Apply the metadata to region's version. + region.version_control.alter_schema(new_meta, builder); + Ok(()) +} + +/// Creates a metadata after applying the alter `request` to the old `metadata`. +/// +/// Returns an error if the `request` is invalid. +fn metadata_after_alteration( + metadata: &RegionMetadata, + request: RegionAlterRequest, +) -> Result { + // Validates request. + request + .validate(metadata) + .context(InvalidRegionRequestSnafu)?; + + let mut builder = RegionMetadataBuilder::from_existing(metadata.clone()); + builder + .alter(request.kind) + .context(InvalidRegionRequestSnafu)? + .bump_version(); + let new_meta = builder.build().context(InvalidMetadataSnafu)?; + assert_eq!(request.schema_version + 1, new_meta.schema_version); + + Ok(Arc::new(new_meta)) +} diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 052bc419b1a4..ae83a6620a07 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -23,7 +23,7 @@ use crate::error::{RegionNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished}; -use crate::worker::RegionWorkerLoop; +use crate::worker::{send_result, RegionWorkerLoop}; impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. @@ -33,9 +33,7 @@ impl RegionWorkerLoop { sender: Option>>, ) { let Some(region) = self.regions.get_region(region_id) else { - if let Some(sender) = sender { - let _ = sender.send(RegionNotFoundSnafu { region_id }.fail()); - } + send_result(sender, RegionNotFoundSnafu { region_id }.fail()); return; }; @@ -78,7 +76,7 @@ impl RegionWorkerLoop { // Apply edit to region's version. region .version_control - .apply_edit(edit, region.file_purger.clone()); + .apply_edit(edit, &[], region.file_purger.clone()); request.on_success(); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 71685c70b7b6..d7db92847caa 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -26,7 +26,7 @@ use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished}; -use crate::worker::RegionWorkerLoop; +use crate::worker::{send_result, RegionWorkerLoop}; impl RegionWorkerLoop { /// On region flush job finished. @@ -56,9 +56,11 @@ impl RegionWorkerLoop { } // Apply edit to region's version. - region - .version_control - .apply_edit(edit, region.file_purger.clone()); + region.version_control.apply_edit( + edit, + &request.memtables_to_remove, + region.file_purger.clone(), + ); region.update_flush_millis(); // Delete wal. @@ -75,9 +77,14 @@ impl RegionWorkerLoop { // Notifies waiters. request.on_success(); - // Handle pending DDL requests for the region. - if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) { + // Handle pending requests for the region. + if let Some((ddl_requests, write_requests)) = + self.flush_scheduler.on_flush_success(region_id) + { + // Perform DDLs first because they require empty memtables. self.handle_ddl_requests(ddl_requests).await; + // Handle pending write requests, we don't stall these requests. + self.handle_write_requests(write_requests, false).await; } // Handle stalled requests. @@ -97,9 +104,7 @@ impl RegionWorkerLoop { sender: Option>>, ) { let Some(region) = self.regions.get_region(region_id) else { - if let Some(sender) = sender { - let _ = sender.send(RegionNotFoundSnafu { region_id }.fail()); - } + send_result(sender, RegionNotFoundSnafu { region_id }.fail()); return; }; @@ -174,7 +179,12 @@ impl RegionWorkerLoop { Ok(()) } - fn new_flush_task(&self, region: &MitoRegionRef, reason: FlushReason) -> RegionFlushTask { + /// Create a flush task with specific `reason` for the `region`. + pub(crate) fn new_flush_task( + &self, + region: &MitoRegionRef, + reason: FlushReason, + ) -> RegionFlushTask { // TODO(yingwen): metrics for flush requested. RegionFlushTask { region_id: region.region_id, diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index a882d12500c9..bba18d3d22f7 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,16 +17,14 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use common_query::Output; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use tokio::sync::oneshot::Sender; use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; -use crate::worker::RegionWorkerLoop; +use crate::worker::{send_result, RegionWorkerLoop}; impl RegionWorkerLoop { /// Takes and handles all write requests. @@ -93,6 +91,15 @@ impl RegionWorkerLoop { for mut sender_req in write_requests { let region_id = sender_req.request.region_id; + // If region is waiting for alteration, add requests to pending writes. + if self.flush_scheduler.has_pending_ddls(region_id) { + // TODO(yingwen): consider adding some metrics for this. + // Safety: The region has pending ddls. + self.flush_scheduler + .add_write_request_to_pending(sender_req); + continue; + } + // Checks whether the region exists and is it stalling. if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { let Some(region) = self.regions.get_region(region_id) else { @@ -167,11 +174,3 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad Ok(()) } - -/// Send result to the request. -fn send_result(sender: Option>>, res: Result) { - if let Some(sender) = sender { - // Ignore send result. - let _ = sender.send(res); - } -} diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 716dc647c3e4..1f2afadbbac1 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -32,6 +32,7 @@ use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, Location, OptionExt, ResultExt, Snafu}; +use crate::region_request::{AddColumn, AddColumnLocation, AlterKind}; use crate::storage::{ColumnId, RegionId}; pub type Result = std::result::Result; @@ -127,6 +128,10 @@ pub struct RegionMetadata { /// Immutable and unique id of a region. pub region_id: RegionId, + /// Current version of the region schema. + /// + /// The version starts from 0. Altering the schema bumps the version. + pub schema_version: u64, } pub type RegionMetadataRef = Arc; @@ -142,6 +147,7 @@ impl<'de> Deserialize<'de> for RegionMetadata { column_metadatas: Vec, primary_key: Vec, region_id: RegionId, + schema_version: u64, } let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?; @@ -155,6 +161,7 @@ impl<'de> Deserialize<'de> for RegionMetadata { column_metadatas: without_schema.column_metadatas, primary_key: without_schema.primary_key, region_id: without_schema.region_id, + schema_version: without_schema.schema_version, }) } } @@ -378,6 +385,7 @@ pub struct RegionMetadataBuilder { region_id: RegionId, column_metadatas: Vec, primary_key: Vec, + schema_version: u64, } impl RegionMetadataBuilder { @@ -387,31 +395,50 @@ impl RegionMetadataBuilder { region_id: id, column_metadatas: vec![], primary_key: vec![], + schema_version: 0, } } - /// Create a builder from existing [RegionMetadata]. + /// Creates a builder from existing [RegionMetadata]. pub fn from_existing(existing: RegionMetadata) -> Self { Self { column_metadatas: existing.column_metadatas, primary_key: existing.primary_key, region_id: existing.region_id, + schema_version: existing.schema_version, } } - /// Push a new column metadata to this region's metadata. + /// Pushes a new column metadata to this region's metadata. pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self { self.column_metadatas.push(column_metadata); self } - /// Set the primary key of the region. + /// Sets the primary key of the region. pub fn primary_key(&mut self, key: Vec) -> &mut Self { self.primary_key = key; self } - /// Consume the builder and build a [RegionMetadata]. + /// Increases the schema version by 1. + pub fn bump_version(&mut self) -> &mut Self { + self.schema_version += 1; + self + } + + /// Applies the alter `kind` to the builder. + /// + /// The `kind` should be valid. + pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> { + match kind { + AlterKind::AddColumns { columns } => self.add_columns(columns)?, + AlterKind::DropColumns { names } => self.drop_columns(&names), + } + Ok(self) + } + + /// Consumes the builder and build a [RegionMetadata]. pub fn build(self) -> Result { let skipped = SkippedFields::new(&self.column_metadatas)?; @@ -422,12 +449,58 @@ impl RegionMetadataBuilder { column_metadatas: self.column_metadatas, primary_key: self.primary_key, region_id: self.region_id, + schema_version: self.schema_version, }; meta.validate()?; Ok(meta) } + + /// Adds columns to the metadata. + fn add_columns(&mut self, columns: Vec) -> Result<()> { + for add_column in columns { + let column_id = add_column.column_metadata.column_id; + let semantic_type = add_column.column_metadata.semantic_type; + match add_column.location { + None => { + self.column_metadatas.push(add_column.column_metadata); + } + Some(AddColumnLocation::First) => { + self.column_metadatas.insert(0, add_column.column_metadata); + } + Some(AddColumnLocation::After { column_name }) => { + let pos = self + .column_metadatas + .iter() + .position(|col| col.column_schema.name == column_name) + .context(InvalidRegionRequestSnafu { + region_id: self.region_id, + err: format!( + "column {} not found, failed to add column {} after it", + column_name, add_column.column_metadata.column_schema.name + ), + })?; + // Insert after pos. + self.column_metadatas + .insert(pos + 1, add_column.column_metadata); + } + } + if semantic_type == SemanticType::Tag { + // For a new tag, we extend the primary key. + self.primary_key.push(column_id); + } + } + + Ok(()) + } + + /// Drops columns from the metadata. + fn drop_columns(&mut self, names: &[String]) { + let name_set: HashSet<_> = names.iter().collect(); + self.column_metadatas + .retain(|col| !name_set.contains(&col.column_schema.name)); + } } /// Fields skipped in serialization. @@ -497,7 +570,7 @@ pub enum MetadataError { }, #[snafu(display( - "Failed to convert with struct from datatypes, location: {}, source: {}", + "Failed to convert struct from datatypes, location: {}, source: {}", location, source ))] @@ -506,8 +579,20 @@ pub enum MetadataError { source: datatypes::error::Error, }, - #[snafu(display("Invalid raw region request: {err}, at {location}"))] + #[snafu(display("Invalid raw region request, err: {}, location: {}", err, location))] InvalidRawRegionRequest { err: String, location: Location }, + + #[snafu(display( + "Invalid region request, region_id: {}, err: {}, location: {}", + region_id, + err, + location + ))] + InvalidRegionRequest { + region_id: RegionId, + err: String, + location: Location, + }, } impl ErrorExt for MetadataError { @@ -812,4 +897,113 @@ mod test { "unexpected err: {err}", ); } + + #[test] + fn test_bump_version() { + let mut region_metadata = build_test_region_metadata(); + let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone()); + builder.bump_version(); + let new_meta = builder.build().unwrap(); + region_metadata.schema_version += 1; + assert_eq!(region_metadata, new_meta); + } + + fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata { + let semantic_type = if is_tag { + SemanticType::Tag + } else { + SemanticType::Field + }; + ColumnMetadata { + column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true), + semantic_type, + column_id, + } + } + + fn check_columns(metadata: &RegionMetadata, names: &[&str]) { + let actual: Vec<_> = metadata + .column_metadatas + .iter() + .map(|col| &col.column_schema.name) + .collect(); + assert_eq!(names, actual); + } + + #[test] + fn test_alter() { + // a (tag), b (field), c (ts) + let metadata = build_test_region_metadata(); + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: new_column_metadata("d", true, 4), + location: None, + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "c", "d"]); + assert_eq!([1, 4], &metadata.primary_key[..]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: new_column_metadata("e", false, 5), + location: Some(AddColumnLocation::First), + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["e", "a", "b", "c", "d"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: new_column_metadata("f", false, 6), + location: Some(AddColumnLocation::After { + column_name: "b".to_string(), + }), + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: new_column_metadata("g", false, 7), + location: Some(AddColumnLocation::After { + column_name: "d".to_string(), + }), + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::DropColumns { + names: vec!["g".to_string(), "e".to_string()], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "f", "c", "d"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::DropColumns { + names: vec!["a".to_string()], + }) + .unwrap(); + // Build returns error as the primary key has more columns. + let err = builder.build().unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 9d34727ef6bf..7fb65cdc6e18 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use api::v1::region::region_request; -use api::v1::Rows; -use snafu::OptionExt; +use api::v1::add_column_location::LocationType; +use api::v1::region::{alter_request, region_request, AlterRequest}; +use api::v1::{self, Rows, SemanticType}; +use snafu::{ensure, OptionExt}; -use crate::metadata::{ColumnMetadata, InvalidRawRegionRequestSnafu, MetadataError}; +use crate::metadata::{ + ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError, + RegionMetadata, Result, +}; use crate::path_utils::region_dir; -use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest}; +use crate::storage::{ColumnId, RegionId, ScanRequest}; #[derive(Debug)] pub enum RegionRequest { @@ -39,11 +43,7 @@ pub enum RegionRequest { impl RegionRequest { /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id. /// Inserts/Deletes request might become multiple requests. Others are one-to-one. - // TODO: implement alter request - #[allow(unreachable_code)] - pub fn try_from_request_body( - body: region_request::Body, - ) -> Result, MetadataError> { + pub fn try_from_request_body(body: region_request::Body) -> Result> { match body { region_request::Body::Inserts(inserts) => Ok(inserts .requests @@ -68,7 +68,7 @@ impl RegionRequest { .column_defs .into_iter() .map(ColumnMetadata::try_from_column_def) - .collect::, _>>()?; + .collect::>>()?; let region_id = create.region_id.into(); let region_dir = region_dir(&create.catalog, &create.schema, region_id); Ok(vec![( @@ -103,25 +103,10 @@ impl RegionRequest { close.region_id.into(), Self::Close(RegionCloseRequest {}), )]), - region_request::Body::Alter(alter) => { - let kind = alter.kind.context(InvalidRawRegionRequestSnafu { - err: "'kind' is absent", - })?; - Ok(vec![( - alter.region_id.into(), - Self::Alter(RegionAlterRequest { - request: AlterRequest { - operation: kind.try_into().map_err(|e| { - InvalidRawRegionRequestSnafu { - err: format!("{e}"), - } - .build() - })?, - version: alter.schema_version as _, - }, - }), - )]) - } + region_request::Body::Alter(alter) => Ok(vec![( + alter.region_id.into(), + Self::Alter(RegionAlterRequest::try_from(alter)?), + )]), region_request::Body::Flush(flush) => Ok(vec![( flush.region_id.into(), Self::Flush(RegionFlushRequest {}), @@ -189,9 +174,236 @@ pub struct RegionOpenRequest { #[derive(Debug)] pub struct RegionCloseRequest {} -#[derive(Debug)] +/// Alter metadata of a region. +#[derive(Debug, PartialEq, Eq)] pub struct RegionAlterRequest { - pub request: AlterRequest, + /// The version of the schema before applying the alteration. + pub schema_version: u64, + /// Kind of alteration to do. + pub kind: AlterKind, +} + +impl RegionAlterRequest { + /// Checks whether the request is valid, returns an error if it is invalid. + pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { + ensure!( + metadata.schema_version == self.schema_version, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "region schema version {} is not equal to request schema version {}", + metadata.schema_version, self.schema_version + ), + } + ); + + self.kind.validate(metadata)?; + + Ok(()) + } +} + +impl TryFrom for RegionAlterRequest { + type Error = MetadataError; + + fn try_from(value: AlterRequest) -> Result { + let kind = value.kind.context(InvalidRawRegionRequestSnafu { + err: "missing kind in AlterRequest", + })?; + + let kind = AlterKind::try_from(kind)?; + Ok(RegionAlterRequest { + schema_version: value.schema_version, + kind, + }) + } +} + +/// Kind of the alteration. +#[derive(Debug, PartialEq, Eq)] +pub enum AlterKind { + /// Add columns to the region. + AddColumns { + /// Columns to add. + columns: Vec, + }, + /// Drop columns from the region, only fields are allowed to drop. + DropColumns { + /// Name of columns to drop. + names: Vec, + }, +} + +impl AlterKind { + /// Returns an error if the the alter kind is invalid. + pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { + match self { + AlterKind::AddColumns { columns } => { + let mut names = HashSet::with_capacity(columns.len()); + for col_to_add in columns { + ensure!( + !names.contains(&col_to_add.column_metadata.column_schema.name), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "add column {} more than once", + col_to_add.column_metadata.column_schema.name + ), + } + ); + col_to_add.validate(metadata)?; + names.insert(&col_to_add.column_metadata.column_schema.name); + } + } + AlterKind::DropColumns { names } => { + for name in names { + Self::validate_column_to_drop(name, metadata)?; + } + } + } + Ok(()) + } + + /// Returns an error if the column to drop is invalid. + fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> { + let column = metadata + .column_by_name(name) + .with_context(|| InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!("column {} does not exist", name), + })?; + ensure!( + column.semantic_type == SemanticType::Field, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!("column {} is not a field and could not be dropped", name), + } + ); + Ok(()) + } +} + +impl TryFrom for AlterKind { + type Error = MetadataError; + + fn try_from(kind: alter_request::Kind) -> Result { + let alter_kind = match kind { + alter_request::Kind::AddColumns(x) => { + let columns = x + .add_columns + .into_iter() + .map(|x| x.try_into()) + .collect::>>()?; + AlterKind::AddColumns { columns } + } + alter_request::Kind::DropColumns(x) => { + let names = x.drop_columns.into_iter().map(|x| x.name).collect(); + AlterKind::DropColumns { names } + } + }; + + Ok(alter_kind) + } +} + +/// Adds a column. +#[derive(Debug, PartialEq, Eq)] +pub struct AddColumn { + /// Metadata of the column to add. + pub column_metadata: ColumnMetadata, + /// Location to add the column. If location is None, the region adds + /// the column to the last. + pub location: Option, +} + +impl AddColumn { + /// Returns an error if the column to add is invalid. + pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { + ensure!( + self.column_metadata.column_schema.is_nullable() + || self + .column_metadata + .column_schema + .default_constraint() + .is_some(), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "no default value for column {}", + self.column_metadata.column_schema.name + ), + } + ); + ensure!( + metadata + .column_by_name(&self.column_metadata.column_schema.name) + .is_none(), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column {} already exists", + self.column_metadata.column_schema.name + ), + } + ); + + Ok(()) + } +} + +impl TryFrom for AddColumn { + type Error = MetadataError; + + fn try_from(add_column: v1::region::AddColumn) -> Result { + let column_def = add_column + .column_def + .context(InvalidRawRegionRequestSnafu { + err: "missing column_def in AddColumn", + })?; + + let column_metadata = ColumnMetadata::try_from_column_def(column_def)?; + let location = add_column + .location + .map(AddColumnLocation::try_from) + .transpose()?; + + Ok(AddColumn { + column_metadata, + location, + }) + } +} + +/// Location to add a column. +#[derive(Debug, PartialEq, Eq)] +pub enum AddColumnLocation { + /// Add the column to the first position of columns. + First, + /// Add the column after specific column. + After { + /// Add the column after this column. + column_name: String, + }, +} + +impl TryFrom for AddColumnLocation { + type Error = MetadataError; + + fn try_from(location: v1::AddColumnLocation) -> Result { + let location_type = LocationType::from_i32(location.location_type).context( + InvalidRawRegionRequestSnafu { + err: format!("unknown location type {}", location.location_type), + }, + )?; + let add_column_location = match location_type { + LocationType::First => AddColumnLocation::First, + LocationType::After => AddColumnLocation::After { + column_name: location.after_column_name, + }, + }; + + Ok(add_column_location) + } } #[derive(Debug)] @@ -199,3 +411,297 @@ pub struct RegionFlushRequest {} #[derive(Debug)] pub struct RegionCompactRequest {} + +#[cfg(test)] +mod tests { + use api::v1::region::RegionColumnDef; + use api::v1::{ColumnDataType, ColumnDef}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + + use super::*; + use crate::metadata::RegionMetadataBuilder; + + #[test] + fn test_from_proto_location() { + let proto_location = v1::AddColumnLocation { + location_type: LocationType::First as i32, + after_column_name: "".to_string(), + }; + let location = AddColumnLocation::try_from(proto_location).unwrap(); + assert_eq!(location, AddColumnLocation::First); + + let proto_location = v1::AddColumnLocation { + location_type: 10, + after_column_name: "".to_string(), + }; + AddColumnLocation::try_from(proto_location).unwrap_err(); + + let proto_location = v1::AddColumnLocation { + location_type: LocationType::After as i32, + after_column_name: "a".to_string(), + }; + let location = AddColumnLocation::try_from(proto_location).unwrap(); + assert_eq!( + location, + AddColumnLocation::After { + column_name: "a".to_string() + } + ); + } + + #[test] + fn test_from_none_proto_add_column() { + AddColumn::try_from(v1::region::AddColumn { + column_def: None, + location: None, + }) + .unwrap_err(); + } + + #[test] + fn test_from_proto_alter_request() { + RegionAlterRequest::try_from(AlterRequest { + region_id: 0, + schema_version: 1, + kind: None, + }) + .unwrap_err(); + + let request = RegionAlterRequest::try_from(AlterRequest { + region_id: 0, + schema_version: 1, + kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns { + add_columns: vec![v1::region::AddColumn { + column_def: Some(RegionColumnDef { + column_def: Some(ColumnDef { + name: "a".to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + }), + column_id: 1, + }), + location: Some(v1::AddColumnLocation { + location_type: LocationType::First as i32, + after_column_name: "".to_string(), + }), + }], + })), + }) + .unwrap(); + + assert_eq!( + request, + RegionAlterRequest { + schema_version: 1, + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "a", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }, + location: Some(AddColumnLocation::First), + }] + }, + } + ); + } + + fn new_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![2]); + builder.build().unwrap() + } + + #[test] + fn test_add_column_validate() { + let metadata = new_metadata(); + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + } + .validate(&metadata) + .unwrap(); + + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + } + .validate(&metadata) + .unwrap_err(); + + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + } + .validate(&metadata) + .unwrap_err(); + } + + #[test] + fn test_add_duplicate_columns() { + let kind = AlterKind::AddColumns { + columns: vec![ + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + }, + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 5, + }, + location: None, + }, + ], + }; + let metadata = new_metadata(); + kind.validate(&metadata).unwrap_err(); + } + + #[test] + fn test_validate_drop_column() { + let metadata = new_metadata(); + AlterKind::DropColumns { + names: vec!["xxxx".to_string()], + } + .validate(&metadata) + .unwrap_err(); + AlterKind::DropColumns { + names: vec!["tag_0".to_string()], + } + .validate(&metadata) + .unwrap_err(); + AlterKind::DropColumns { + names: vec!["field_0".to_string()], + } + .validate(&metadata) + .unwrap(); + } + + #[test] + fn test_validate_schema_version() { + let mut metadata = new_metadata(); + metadata.schema_version = 2; + + RegionAlterRequest { + schema_version: 1, + kind: AlterKind::DropColumns { + names: vec!["field_0".to_string()], + }, + } + .validate(&metadata) + .unwrap_err(); + } + + #[test] + fn test_validate_add_columns() { + let kind = AlterKind::AddColumns { + columns: vec![ + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + }, + AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "field_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 5, + }, + location: None, + }, + ], + }; + let request = RegionAlterRequest { + schema_version: 1, + kind, + }; + let mut metadata = new_metadata(); + metadata.schema_version = 1; + request.validate(&metadata).unwrap(); + } +}