Skip to content

Commit

Permalink
feat(mito): Implement open and close for mito2 regions (GreptimeTeam#…
Browse files Browse the repository at this point in the history
…2052)

* feat: add close request

* feat: handle close and open request

* feat: Implement open

* test: add TestEnv::new

* feat: close region/engine and test

* style: fix clippy

* style: import log macros

* docs: update docs

* docs: add mermaid for manifest manager
  • Loading branch information
evenyag authored and paomian committed Oct 19, 2023
1 parent 659e38d commit e051346
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 60 deletions.
9 changes: 4 additions & 5 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_telemetry::logging;
use common_telemetry::warn;

/// Default region worker num.
const DEFAULT_NUM_WORKERS: usize = 1;
Expand Down Expand Up @@ -64,16 +64,15 @@ impl MitoConfig {
}
self.num_workers = self.num_workers.next_power_of_two();
if num_workers_before != self.num_workers {
logging::warn!(
warn!(
"Sanitize worker num {} to {}",
num_workers_before,
self.num_workers
num_workers_before, self.num_workers
);
}

// Sanitize channel size.
if self.worker_channel_size == 0 {
logging::warn!("Sanitize channel size 0 to 1");
warn!("Sanitize channel size 0 to 1");
self.worker_channel_size = 1;
}
}
Expand Down
32 changes: 26 additions & 6 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::worker::request::CreateRequest;
use crate::worker::request::{RegionRequest, RequestBody};
use crate::worker::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -56,8 +56,28 @@ impl MitoEngine {
}

/// Creates a new region.
pub async fn create_region(&self, request: CreateRequest) -> Result<()> {
self.inner.create_region(request).await
pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Create(create_request))
.await
}

/// Opens an existing region.
///
/// Returns error if the region does not exist.
pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Open(open_request))
.await
}

/// Closes a region.
///
/// Does nothing if the region is already closed.
pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Close(close_request))
.await
}

/// Returns true if the specific region exists.
Expand Down Expand Up @@ -89,9 +109,9 @@ impl EngineInner {
self.workers.stop().await
}

/// Creates a new region.
async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
let (request, receiver) = RegionRequest::from_body(RequestBody::Create(create_request));
/// Handles [RequestBody] and return its executed result.
async fn handle_request_body(&self, body: RequestBody) -> Result<()> {
let (request, receiver) = RegionRequest::from_body(body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
Expand Down
112 changes: 107 additions & 5 deletions src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::worker::request::RegionOptions;

#[tokio::test]
async fn test_engine_new_stop() {
let env = TestEnv::new("engine-stop");
let env = TestEnv::with_prefix("engine-stop");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();

// Stop the engine to reject further requests.
engine.stop().await.unwrap();
assert!(!engine.is_region_exists(region_id));

let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build();
let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build();
let err = engine.create_region(request).await.unwrap_err();
assert!(
matches!(err, Error::WorkerStopped { .. }),
Expand All @@ -37,7 +44,7 @@ async fn test_engine_new_stop() {

#[tokio::test]
async fn test_engine_create_new_region() {
let env = TestEnv::new("new-region");
let env = TestEnv::with_prefix("new-region");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
Expand All @@ -49,7 +56,7 @@ async fn test_engine_create_new_region() {

#[tokio::test]
async fn test_engine_create_region_if_not_exists() {
let env = TestEnv::new("create-not-exists");
let env = TestEnv::with_prefix("create-not-exists");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true);
Expand All @@ -61,7 +68,7 @@ async fn test_engine_create_region_if_not_exists() {

#[tokio::test]
async fn test_engine_create_existing_region() {
let env = TestEnv::new("create-existing");
let env = TestEnv::with_prefix("create-existing");
let engine = env.create_engine(MitoConfig::default()).await;

let builder = CreateRequestBuilder::new(RegionId::new(1, 1));
Expand All @@ -74,3 +81,98 @@ async fn test_engine_create_existing_region() {
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_open_empty() {
let env = TestEnv::with_prefix("open-empty");
let engine = env.create_engine(MitoConfig::default()).await;

let err = engine
.open_region(OpenRequest {
region_id: RegionId::new(1, 1),
region_dir: "empty".to_string(),
options: RegionOptions::default(),
})
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionNotFound { .. }),
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_open_existing() {
let env = TestEnv::with_prefix("open-exiting");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();

engine
.open_region(OpenRequest {
region_id,
region_dir,
options: RegionOptions::default(),
})
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_close_region() {
let env = TestEnv::with_prefix("close");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// It's okay to close a region doesn't exist.
engine
.close_region(CloseRequest { region_id })
.await
.unwrap();

let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();

engine
.close_region(CloseRequest { region_id })
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));

// It's okay to close this region again.
engine
.close_region(CloseRequest { region_id })
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_reopen_region() {
let env = TestEnv::with_prefix("reopen-region");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();

// Close the region.
engine
.close_region(CloseRequest { region_id })
.await
.unwrap();

// Open the region again.
engine
.open_region(OpenRequest {
region_id,
region_dir,
options: RegionOptions::default(),
})
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
}
22 changes: 21 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ pub enum Error {
source: parquet::errors::ParquetError,
location: Location,
},

#[snafu(display("Region {} not found, location: {}", region_id, location))]
RegionNotFound {
region_id: RegionId,
location: Location,
},

#[snafu(display(
"Region {} is corrupted, reason: {}, location: {}",
region_id,
reason,
location
))]
RegionCorrupted {
region_id: RegionId,
reason: String,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -173,7 +191,9 @@ impl ErrorExt for Error {
| SerdeJson { .. }
| Utf8 { .. }
| RegionExists { .. }
| NewRecordBatch { .. } => StatusCode::Unexpected,
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
StatusCode::InvalidArguments
}
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ mod worker;
///
/// The engine handles DMLs and DDLs in dedicated [workers](crate::worker::WorkerGroup).
///
/// ## Region manifest
///
/// The [RegionManifestManager](crate::manifest::manager::RegionManifestManager) manages metadata of the engine.
///
mod docs {}
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl RegionManifestBuilder {
}
}

/// Check if the builder keeps a [RegionMetadata]
/// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata).
pub fn contains_metadata(&self) -> bool {
self.metadata.is_some()
}
Expand Down
65 changes: 62 additions & 3 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,67 @@ use crate::metadata::RegionMetadataRef;
// trait MetaAction -> struct RegionMetaActionList
// trait MetaActionIterator -> struct MetaActionIteratorImpl

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
/// metadata.
///
/// ```mermaid
/// classDiagram
/// class RegionManifestManager {
/// -ManifestObjectStore store
/// -RegionManifestOptions options
/// -RegionManifest manifest
/// +new() RegionManifestManager
/// +open() Option~RegionManifestManager~
/// +stop()
/// +update(RegionMetaActionList action_list) ManifestVersion
/// +manifest() RegionManifest
/// }
/// class ManifestObjectStore {
/// -ObjectStore object_store
/// }
/// class RegionChange {
/// -RegionMetadataRef metadata
/// }
/// class RegionEdit {
/// -VersionNumber regoin_version
/// -Vec~FileMeta~ files_to_add
/// -Vec~FileMeta~ files_to_remove
/// -SequenceNumber flushed_sequence
/// }
/// class RegionRemove {
/// -RegionId region_id
/// }
/// RegionManifestManager o-- ManifestObjectStore
/// RegionManifestManager o-- RegionManifest
/// RegionManifestManager o-- RegionManifestOptions
/// RegionManifestManager -- RegionMetaActionList
/// RegionManifestManager -- RegionCheckpoint
/// ManifestObjectStore o-- ObjectStore
/// RegionMetaActionList o-- RegionMetaAction
/// RegionMetaAction o-- ProtocolAction
/// RegionMetaAction o-- RegionChange
/// RegionMetaAction o-- RegionEdit
/// RegionMetaAction o-- RegionRemove
/// RegionChange o-- RegionMetadata
/// RegionEdit o-- FileMeta
///
/// class RegionManifest {
/// -RegionMetadataRef metadata
/// -HashMap&lt;FileId, FileMeta&gt; files
/// -ManifestVersion manifest_version
/// }
/// class RegionMetadata
/// class FileMeta
/// RegionManifest o-- RegionMetadata
/// RegionManifest o-- FileMeta
///
/// class RegionCheckpoint {
/// -ManifestVersion last_version
/// -Option~RegionManifest~ checkpoint
/// }
/// RegionCheckpoint o-- RegionManifest
/// ```
#[derive(Debug)]
pub struct RegionManifestManager {
inner: RwLock<RegionManifestManagerInner>,
Expand Down Expand Up @@ -377,7 +436,7 @@ mod test {
#[tokio::test]
async fn create_manifest_manager() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new("");
let env = TestEnv::new();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
Expand All @@ -389,7 +448,7 @@ mod test {

#[tokio::test]
async fn open_manifest_manager() {
let env = TestEnv::new("");
let env = TestEnv::new();
// Try to opens an empty manifest.
assert!(env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
Expand Down Expand Up @@ -420,7 +479,7 @@ mod test {
#[tokio::test]
async fn region_change_add_column() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new("");
let env = TestEnv::new();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
Expand Down
Loading

0 comments on commit e051346

Please sign in to comment.