Skip to content

Commit

Permalink
feat(mito): Support handling RegionWriteRequest (#2218)
Browse files Browse the repository at this point in the history
* feat: convert region request to worker write request

* chore: remove unused codes

* test: fix tests compiler errors

* chore: remove create/close/open request from worker requests

* chore: add comment

* chore: fix typo
  • Loading branch information
evenyag authored Aug 22, 2023
1 parent be1e13c commit cd3755c
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl RegionServer {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()),
RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()),
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Write(_)
RegionRequest::Put(_)
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
Expand Down
28 changes: 6 additions & 22 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@

//! Mito region engine.
// TODO: migrate test to RegionRequest
// #[cfg(test)]
// mod tests;
#[cfg(test)]
mod tests;

use std::sync::Arc;

Expand All @@ -29,7 +28,7 @@ use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::request::RegionTask;
use crate::request::{RegionTask, RequestBody};
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -60,6 +59,7 @@ impl MitoEngine {
self.inner.stop().await
}

/// Handle requests that modify a region.
pub async fn handle_request(
&self,
region_id: RegionId,
Expand All @@ -73,23 +73,6 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

// /// Write to a region.
// pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
// write_request.validate()?;
// RequestValidator::write_request(&write_request)?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.

// let metadata = region.metadata();

// write_request.fill_missing_columns(&metadata)?;
// self.inner
// .handle_request_body(RequestBody::Write(write_request))
// .await
// }
}

/// Inner struct of [MitoEngine].
Expand Down Expand Up @@ -118,7 +101,8 @@ impl EngineInner {
// TODO(yingwen): return `Output` instead of `Result<()>`.
/// Handles [RequestBody] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> {
let (request, receiver) = RegionTask::from_request(region_id, request);
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
Expand Down
116 changes: 80 additions & 36 deletions src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

//! Tests for mito engine.
use std::collections::HashMap;

use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use store_api::storage::RegionId;

use super::*;
use crate::error::Error;
use crate::request::RegionOptions;
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test]
Expand All @@ -27,15 +29,21 @@ async fn test_engine_new_stop() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Stop the engine to reject further requests.
engine.stop().await.unwrap();
assert!(!engine.is_region_exists(region_id));

let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build();
let err = engine.create_region(request).await.unwrap_err();
let request = CreateRequestBuilder::new().build();
let err = engine
.handle_request(RegionId::new(1, 2), RegionRequest::Create(request))
.await
.unwrap_err();
assert!(
matches!(err, Error::WorkerStopped { .. }),
"unexpected err: {err}"
Expand All @@ -48,8 +56,11 @@ async fn test_engine_create_new_region() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

assert!(engine.is_region_exists(region_id));
}
Expand All @@ -59,23 +70,37 @@ async fn test_engine_create_region_if_not_exists() {
let env = TestEnv::with_prefix("create-not-exists");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true);
engine.create_region(builder.build()).await.unwrap();
let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new().create_if_not_exists(true);
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();

// Create the same region again.
engine.create_region(builder.build()).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_create_existing_region() {
let env = TestEnv::with_prefix("create-existing");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1));
engine.create_region(builder.build()).await.unwrap();
let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new();
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();

// Create the same region again.
let err = engine.create_region(builder.build()).await.unwrap_err();
let err = engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionExists { .. }),
"unexpected err: {err}"
Expand All @@ -88,11 +113,14 @@ async fn test_engine_open_empty() {
let engine = env.create_engine(MitoConfig::default()).await;

let err = engine
.open_region(OpenRequest {
region_id: RegionId::new(1, 1),
region_dir: "empty".to_string(),
options: RegionOptions::default(),
})
.handle_request(
RegionId::new(1, 1),
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
}),
)
.await
.unwrap_err();
assert!(
Expand All @@ -107,16 +135,22 @@ async fn test_engine_open_existing() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

engine
.open_region(OpenRequest {
.handle_request(
region_id,
region_dir,
options: RegionOptions::default(),
})
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
}
Expand All @@ -129,22 +163,26 @@ async fn test_engine_close_region() {
let region_id = RegionId::new(1, 1);
// It's okay to close a region doesn't exist.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the created region.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));

// It's okay to close this region again.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
}
Expand All @@ -155,23 +193,29 @@ async fn test_engine_reopen_region() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the region.
engine
.close_region(CloseRequest { region_id })
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again.
engine
.open_region(OpenRequest {
.handle_request(
region_id,
region_dir,
options: RegionOptions::default(),
})
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
Expand Down
Loading

0 comments on commit cd3755c

Please sign in to comment.