diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index b55252ec46b8..34c22642799c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -80,7 +80,7 @@ impl RegionServer { RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, - RegionRequest::Write(_) + RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::Alter(_) | RegionRequest::Flush(_) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 4b342318c475..dec53bac08b4 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -14,9 +14,8 @@ //! Mito region engine. -// TODO: migrate test to RegionRequest -// #[cfg(test)] -// mod tests; +#[cfg(test)] +mod tests; use std::sync::Arc; @@ -29,7 +28,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; -use crate::request::RegionTask; +use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -60,6 +59,7 @@ impl MitoEngine { self.inner.stop().await } + /// Handle requests that modify a region. pub async fn handle_request( &self, region_id: RegionId, @@ -73,23 +73,6 @@ impl MitoEngine { pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } - - // /// Write to a region. - // pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { - // write_request.validate()?; - // RequestValidator::write_request(&write_request)?; - - // TODO(yingwen): Fill default values. - // We need to fill default values before writing it to WAL so we can get - // the same default value after reopening the region. - - // let metadata = region.metadata(); - - // write_request.fill_missing_columns(&metadata)?; - // self.inner - // .handle_request_body(RequestBody::Write(write_request)) - // .await - // } } /// Inner struct of [MitoEngine]. @@ -118,7 +101,8 @@ impl EngineInner { // TODO(yingwen): return `Output` instead of `Result<()>`. /// Handles [RequestBody] and return its executed result. async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> { - let (request, receiver) = RegionTask::from_request(region_id, request); + let body = RequestBody::try_from_region_request(region_id, request)?; + let (request, receiver) = RegionTask::from_request(region_id, body); self.workers.submit_to_worker(request).await?; receiver.await.context(RecvSnafu)? diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index f379362d5975..5e7b96bee472 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -14,11 +14,13 @@ //! Tests for mito engine. +use std::collections::HashMap; + +use store_api::region_request::{RegionCloseRequest, RegionOpenRequest}; use store_api::storage::RegionId; use super::*; use crate::error::Error; -use crate::request::RegionOptions; use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] @@ -27,15 +29,21 @@ async fn test_engine_new_stop() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new(region_id).build(); - engine.create_region(request).await.unwrap(); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); // Stop the engine to reject further requests. engine.stop().await.unwrap(); assert!(!engine.is_region_exists(region_id)); - let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build(); - let err = engine.create_region(request).await.unwrap_err(); + let request = CreateRequestBuilder::new().build(); + let err = engine + .handle_request(RegionId::new(1, 2), RegionRequest::Create(request)) + .await + .unwrap_err(); assert!( matches!(err, Error::WorkerStopped { .. }), "unexpected err: {err}" @@ -48,8 +56,11 @@ async fn test_engine_create_new_region() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new(region_id).build(); - engine.create_region(request).await.unwrap(); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); assert!(engine.is_region_exists(region_id)); } @@ -59,11 +70,18 @@ async fn test_engine_create_region_if_not_exists() { let env = TestEnv::with_prefix("create-not-exists"); let engine = env.create_engine(MitoConfig::default()).await; - let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true); - engine.create_region(builder.build()).await.unwrap(); + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new().create_if_not_exists(true); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); // Create the same region again. - engine.create_region(builder.build()).await.unwrap(); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); } #[tokio::test] @@ -71,11 +89,18 @@ async fn test_engine_create_existing_region() { let env = TestEnv::with_prefix("create-existing"); let engine = env.create_engine(MitoConfig::default()).await; - let builder = CreateRequestBuilder::new(RegionId::new(1, 1)); - engine.create_region(builder.build()).await.unwrap(); + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); // Create the same region again. - let err = engine.create_region(builder.build()).await.unwrap_err(); + let err = engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap_err(); assert!( matches!(err, Error::RegionExists { .. }), "unexpected err: {err}" @@ -88,11 +113,14 @@ async fn test_engine_open_empty() { let engine = env.create_engine(MitoConfig::default()).await; let err = engine - .open_region(OpenRequest { - region_id: RegionId::new(1, 1), - region_dir: "empty".to_string(), - options: RegionOptions::default(), - }) + .handle_request( + RegionId::new(1, 1), + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: "empty".to_string(), + options: HashMap::default(), + }), + ) .await .unwrap_err(); assert!( @@ -107,16 +135,22 @@ async fn test_engine_open_existing() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new(region_id).build(); + let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); - engine.create_region(request).await.unwrap(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); engine - .open_region(OpenRequest { + .handle_request( region_id, - region_dir, - options: RegionOptions::default(), - }) + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) .await .unwrap(); } @@ -129,22 +163,26 @@ async fn test_engine_close_region() { let region_id = RegionId::new(1, 1); // It's okay to close a region doesn't exist. engine - .close_region(CloseRequest { region_id }) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); - let request = CreateRequestBuilder::new(region_id).build(); - engine.create_region(request).await.unwrap(); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Close the created region. engine - .close_region(CloseRequest { region_id }) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(region_id)); // It's okay to close this region again. engine - .close_region(CloseRequest { region_id }) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); } @@ -155,23 +193,29 @@ async fn test_engine_reopen_region() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new(region_id).build(); + let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); - engine.create_region(request).await.unwrap(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); // Close the region. engine - .close_region(CloseRequest { region_id }) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the region again. engine - .open_region(OpenRequest { + .handle_request( region_id, - region_dir, - options: RegionOptions::default(), - }) + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) .await .unwrap(); assert!(engine.is_region_exists(region_id)); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4e1c2fd4b004..03d7d8c17cc3 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -25,8 +25,11 @@ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value}; use common_base::readable_size::ReadableSize; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; -use store_api::region_request::RegionRequest; -use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; +use store_api::region_request::{ + RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, + RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, +}; +use store_api::storage::{CompactionStrategy, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; @@ -55,41 +58,6 @@ impl Default for RegionOptions { } } -/// Create region request. -#[derive(Debug)] -pub struct CreateRequest { - /// Region to create. - pub region_id: RegionId, - /// Data directory of the region. - pub region_dir: String, - /// Columns in this region. - pub column_metadatas: Vec, - /// Columns in the primary key. - pub primary_key: Vec, - /// Create region if not exists. - pub create_if_not_exists: bool, - /// Options of the created region. - pub options: RegionOptions, -} - -/// Open region request. -#[derive(Debug)] -pub struct OpenRequest { - /// Region to open. - pub region_id: RegionId, - /// Data directory of the region. - pub region_dir: String, - /// Options of the created region. - pub options: RegionOptions, -} - -/// Close region request. -#[derive(Debug)] -pub struct CloseRequest { - /// Region to close. - pub region_id: RegionId, -} - /// Request to write a region. #[derive(Debug)] pub struct WriteRequest { @@ -160,6 +128,7 @@ impl WriteRequest { self.name_to_index.get(name).copied() } + // TODO(yingwen): Check delete schema. /// Checks schema of rows is compatible with schema of the region. /// /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) @@ -366,22 +335,22 @@ pub(crate) struct RegionTask { /// with an enum if we need to carry more information. pub(crate) sender: Option>>, /// Request body. - pub(crate) request: RegionRequest, + pub(crate) body: RequestBody, /// Region identifier. pub(crate) region_id: RegionId, } impl RegionTask { - /// Creates a [RegionTask] and a receiver from [RegionRequest]. + /// Creates a [RegionTask] and a receiver from request body. pub(crate) fn from_request( region_id: RegionId, - request: RegionRequest, + body: RequestBody, ) -> (RegionTask, Receiver>) { let (sender, receiver) = oneshot::channel(); ( RegionTask { sender: Some(sender), - request, + body, region_id, }, receiver, @@ -389,17 +358,46 @@ impl RegionTask { } } -/// Mito Region Engine's request validator -pub(crate) struct RequestValidator; - -impl RequestValidator { - /// Validate the [WriteRequest]. - pub fn write_request(_write_request: &WriteRequest) -> Result<()> { - // - checks whether the request is too large. - // - checks whether each row in rows has the same schema. - // - checks whether each column match the schema in Rows. - // - checks rows don't have duplicate columns. - unimplemented!() +/// Request body of a region task. +/// +/// It validates requests outside of workers. +#[derive(Debug)] +pub(crate) enum RequestBody { + Write(WriteRequest), + Create(RegionCreateRequest), + Drop(RegionDropRequest), + Open(RegionOpenRequest), + Close(RegionCloseRequest), + Alter(RegionAlterRequest), + Flush(RegionFlushRequest), + Compact(RegionCompactRequest), +} + +impl RequestBody { + /// Convert request body from [RegionRequest]. + pub(crate) fn try_from_region_request( + region_id: RegionId, + value: RegionRequest, + ) -> Result { + let body = match value { + RegionRequest::Put(v) => { + let write_request = WriteRequest::new(region_id, OpType::Put, v.rows)?; + RequestBody::Write(write_request) + } + RegionRequest::Delete(v) => { + let write_request = WriteRequest::new(region_id, OpType::Delete, v.rows)?; + RequestBody::Write(write_request) + } + RegionRequest::Create(v) => RequestBody::Create(v), + RegionRequest::Drop(v) => RequestBody::Drop(v), + RegionRequest::Open(v) => RequestBody::Open(v), + RegionRequest::Close(v) => RequestBody::Close(v), + RegionRequest::Alter(v) => RequestBody::Alter(v), + RegionRequest::Flush(v) => RequestBody::Flush(v), + RegionRequest::Compact(v) => RequestBody::Compact(v), + }; + + Ok(body) } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c207f1b7ccc9..8b791c0acbd6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,6 +14,7 @@ //! Utilities for testing. +use std::collections::HashMap; use std::sync::Arc; use api::greptime_proto::v1; @@ -28,13 +29,12 @@ use log_store::test_util::log_store_util; use object_store::services::Fs; use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::storage::RegionId; +use store_api::region_request::RegionCreateRequest; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; /// Env to test mito engine. @@ -131,9 +131,8 @@ impl TestEnv { } } -/// Builder to mock a [CreateRequest]. +/// Builder to mock a [RegionCreateRequest]. pub struct CreateRequestBuilder { - region_id: RegionId, region_dir: String, tag_num: usize, field_num: usize, @@ -143,7 +142,6 @@ pub struct CreateRequestBuilder { impl Default for CreateRequestBuilder { fn default() -> Self { CreateRequestBuilder { - region_id: RegionId::default(), region_dir: "test".to_string(), tag_num: 1, field_num: 1, @@ -153,11 +151,8 @@ impl Default for CreateRequestBuilder { } impl CreateRequestBuilder { - pub fn new(region_id: RegionId) -> CreateRequestBuilder { - CreateRequestBuilder { - region_id, - ..Default::default() - } + pub fn new() -> CreateRequestBuilder { + CreateRequestBuilder::default() } pub fn region_dir(mut self, value: &str) -> Self { @@ -180,7 +175,7 @@ impl CreateRequestBuilder { self } - pub fn build(&self) -> CreateRequest { + pub fn build(&self) -> RegionCreateRequest { let mut column_id = 0; let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); let mut primary_key = Vec::with_capacity(self.tag_num); @@ -219,13 +214,14 @@ impl CreateRequestBuilder { column_id, }); - CreateRequest { - region_id: self.region_id, - region_dir: self.region_dir.clone(), + RegionCreateRequest { + // We use empty engine name as we already locates the engine. + engine: String::new(), column_metadatas, primary_key, create_if_not_exists: self.create_if_not_exists, - options: RegionOptions::default(), + options: HashMap::default(), + region_dir: self.region_dir.clone(), } } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index edb8d83726d0..ff15536aa1d9 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -30,7 +30,6 @@ use futures::future::try_join_all; use object_store::ObjectStore; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; @@ -39,7 +38,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; -use crate::request::{RegionTask, WorkerRequest}; +use crate::request::{RegionTask, RequestBody, SenderWriteRequest, WorkerRequest}; use crate::wal::Wal; /// Identifier for a worker. @@ -335,16 +334,16 @@ impl RegionWorkerLoop { /// /// `buffer` should be empty. async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { - let write_requests = Vec::with_capacity(buffer.len()); + let mut write_requests = Vec::with_capacity(buffer.len()); let mut ddl_requests = Vec::with_capacity(buffer.len()); for worker_req in buffer.drain(..) { match worker_req { WorkerRequest::Region(task) => { - if matches!(task.request, RegionRequest::Write(_)) { - // write_requests.push(SenderWriteRequest { - // sender: task.sender, - // request: task.request.into_write_request(), - // }); + if let RequestBody::Write(write_request) = task.body { + write_requests.push(SenderWriteRequest { + sender: task.sender, + request: write_request, + }); } else { ddl_requests.push(task); } @@ -374,16 +373,15 @@ impl RegionWorkerLoop { } for task in ddl_tasks { - let res: std::result::Result<(), crate::error::Error> = match task.request { - RegionRequest::Create(req) => self.handle_create_request(task.region_id, req).await, - RegionRequest::Open(req) => self.handle_open_request(task.region_id, req).await, - RegionRequest::Close(_) => self.handle_close_request(task.region_id).await, - RegionRequest::Write(_) - | RegionRequest::Delete(_) - | RegionRequest::Drop(_) - | RegionRequest::Alter(_) - | RegionRequest::Flush(_) - | RegionRequest::Compact(_) => unreachable!(), + let res: std::result::Result<(), crate::error::Error> = match task.body { + RequestBody::Create(req) => self.handle_create_request(task.region_id, req).await, + RequestBody::Open(req) => self.handle_open_request(task.region_id, req).await, + RequestBody::Close(_) => self.handle_close_request(task.region_id).await, + RequestBody::Write(_) + | RequestBody::Drop(_) + | RequestBody::Alter(_) + | RequestBody::Flush(_) + | RequestBody::Compact(_) => unreachable!(), }; if let Some(sender) = task.sender { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 80bac9529167..5d3becb53fda 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -111,7 +111,7 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad if let Err(e) = request.check_schema(metadata) { if e.is_fill_default() { // TODO(yingwen): Add metrics for this case. - // We need to fill default value again. The write request may be a request + // We need to fill default value. The write request may be a request // sent before changing the schema. request.fill_missing_columns(metadata)?; } else { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 8f7a460d05aa..818bbe4f20bb 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -21,7 +21,7 @@ use crate::storage::{AlterRequest, ColumnId, ScanRequest}; #[derive(Debug)] pub enum RegionRequest { - Write(RegionWriteRequest), + Put(RegionPutRequest), Delete(RegionDeleteRequest), Create(RegionCreateRequest), Drop(RegionDropRequest), @@ -32,10 +32,10 @@ pub enum RegionRequest { Compact(RegionCompactRequest), } -/// Request to write a region. +/// Request to put data into a region. #[derive(Debug)] -pub struct RegionWriteRequest { - /// Rows to write. +pub struct RegionPutRequest { + /// Rows to put. pub rows: Rows, } @@ -44,9 +44,12 @@ pub struct RegionReadRequest { pub request: ScanRequest, } +/// Request to delete data from a region. #[derive(Debug)] pub struct RegionDeleteRequest { - /// Rows to write. + /// Keys to rows to delete. + /// + /// Each row only contains primary key columns and a time index column. pub rows: Rows, }