Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Support handling RegionWriteRequest #2218

Merged
merged 6 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl RegionServer {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()),
RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()),
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Write(_)
RegionRequest::Put(_)
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
Expand Down
28 changes: 6 additions & 22 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@

//! Mito region engine.

// TODO: migrate test to RegionRequest
// #[cfg(test)]
// mod tests;
#[cfg(test)]
mod tests;

use std::sync::Arc;

Expand All @@ -29,7 +28,7 @@ use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::request::RegionTask;
use crate::request::{RegionTask, RequestBody};
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -60,6 +59,7 @@ impl MitoEngine {
self.inner.stop().await
}

/// Handle requests that modify a region.
pub async fn handle_request(
&self,
region_id: RegionId,
Expand All @@ -73,23 +73,6 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

// /// Write to a region.
// pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
// write_request.validate()?;
// RequestValidator::write_request(&write_request)?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.

// let metadata = region.metadata();

// write_request.fill_missing_columns(&metadata)?;
// self.inner
// .handle_request_body(RequestBody::Write(write_request))
// .await
// }
}

/// Inner struct of [MitoEngine].
Expand Down Expand Up @@ -118,7 +101,8 @@ impl EngineInner {
// TODO(yingwen): return `Output` instead of `Result<()>`.
/// Handles [RequestBody] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> {
let (request, receiver) = RegionTask::from_request(region_id, request);
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
Expand Down
116 changes: 80 additions & 36 deletions src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

//! Tests for mito engine.

use std::collections::HashMap;

use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use store_api::storage::RegionId;

use super::*;
use crate::error::Error;
use crate::request::RegionOptions;
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test]
Expand All @@ -27,15 +29,21 @@ async fn test_engine_new_stop() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Stop the engine to reject further requests.
engine.stop().await.unwrap();
assert!(!engine.is_region_exists(region_id));

let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build();
let err = engine.create_region(request).await.unwrap_err();
let request = CreateRequestBuilder::new().build();
let err = engine
.handle_request(RegionId::new(1, 2), RegionRequest::Create(request))
.await
.unwrap_err();
assert!(
matches!(err, Error::WorkerStopped { .. }),
"unexpected err: {err}"
Expand All @@ -48,8 +56,11 @@ async fn test_engine_create_new_region() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

assert!(engine.is_region_exists(region_id));
}
Expand All @@ -59,23 +70,37 @@ async fn test_engine_create_region_if_not_exists() {
let env = TestEnv::with_prefix("create-not-exists");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true);
engine.create_region(builder.build()).await.unwrap();
let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new().create_if_not_exists(true);
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();

// Create the same region again.
engine.create_region(builder.build()).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_create_existing_region() {
let env = TestEnv::with_prefix("create-existing");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1));
engine.create_region(builder.build()).await.unwrap();
let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new();
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();

// Create the same region again.
let err = engine.create_region(builder.build()).await.unwrap_err();
let err = engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionExists { .. }),
"unexpected err: {err}"
Expand All @@ -88,11 +113,14 @@ async fn test_engine_open_empty() {
let engine = env.create_engine(MitoConfig::default()).await;

let err = engine
.open_region(OpenRequest {
region_id: RegionId::new(1, 1),
region_dir: "empty".to_string(),
options: RegionOptions::default(),
})
.handle_request(
RegionId::new(1, 1),
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
}),
)
.await
.unwrap_err();
assert!(
Expand All @@ -107,16 +135,22 @@ async fn test_engine_open_existing() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

engine
.open_region(OpenRequest {
.handle_request(
region_id,
region_dir,
options: RegionOptions::default(),
})
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
}
Expand All @@ -129,22 +163,26 @@ async fn test_engine_close_region() {
let region_id = RegionId::new(1, 1);
// It's okay to close a region doesn't exist.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the created region.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));

// It's okay to close this region again.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
}
Expand All @@ -155,23 +193,29 @@ async fn test_engine_reopen_region() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the region.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again.
engine
.open_region(OpenRequest {
.handle_request(
region_id,
region_dir,
options: RegionOptions::default(),
})
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
Expand Down
Loading