diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 5bd934abbcdd..15af842799f8 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -19,15 +19,22 @@ use store_api::storage::RegionId; use super::*; use crate::error::Error; use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::worker::request::RegionOptions; #[tokio::test] async fn test_engine_new_stop() { let env = TestEnv::with_prefix("engine-stop"); let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + engine.create_region(request).await.unwrap(); + + // Stop the engine to reject further requests. engine.stop().await.unwrap(); + assert!(!engine.is_region_exists(region_id)); - let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build(); + let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build(); let err = engine.create_region(request).await.unwrap_err(); assert!( matches!(err, Error::WorkerStopped { .. }), @@ -74,3 +81,92 @@ async fn test_engine_create_existing_region() { "unexpected err: {err}" ); } + +#[tokio::test] +async fn test_engine_open_empty() { + let env = TestEnv::with_prefix("open-empty"); + let engine = env.create_engine(MitoConfig::default()).await; + + let err = engine + .open_region(OpenRequest { + region_id: RegionId::new(1, 1), + region_dir: "empty".to_string(), + options: RegionOptions::default(), + }) + .await + .unwrap_err(); + assert!( + matches!(err, Error::RegionNotFound { .. }), + "unexpected err: {err}" + ); +} + +#[tokio::test] +async fn test_engine_open_existing() { + let env = TestEnv::with_prefix("open-exiting"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + let region_dir = request.region_dir.clone(); + engine.create_region(request).await.unwrap(); + + engine + .open_region(OpenRequest { + region_id, + region_dir, + options: RegionOptions::default(), + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_close_region() { + let env = TestEnv::with_prefix("close"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + engine.create_region(request).await.unwrap(); + + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); + assert!(!engine.is_region_exists(region_id)); + + // It's okay to close this region again. + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_reopen_region() { + let env = TestEnv::with_prefix("reopen-region"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + let region_dir = request.region_dir.clone(); + engine.create_region(request).await.unwrap(); + + // Close the region. + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); + + // Open the region again. + engine + .open_region(OpenRequest { + region_id, + region_dir, + options: RegionOptions::default(), + }) + .await + .unwrap(); + assert!(engine.is_region_exists(region_id)); +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 358a121245ad..a143305be057 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -88,6 +88,17 @@ impl RegionMap { let mut regions = self.regions.write().unwrap(); regions.remove(®ion_id); } + + /// List all regions. + pub(crate) fn list_regions(&self) -> Vec { + let regions = self.regions.read().unwrap(); + regions.values().cloned().collect() + } + + /// Clear the map. + pub(crate) fn clear(&self) { + self.regions.write().unwrap().clear(); + } } pub(crate) type RegionMapRef = Arc; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 200f1131bd66..d308f5c128c3 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -313,6 +313,8 @@ impl RegionWorkerLoop { self.handle_requests(&mut buffer).await; } + self.clean().await; + logging::info!("Exit region worker thread {}", self.id); } @@ -378,6 +380,19 @@ impl RegionWorkerLoop { } } } + + // Clean up the worker. + async fn clean(&self) { + // Closes remaining regions. + let regions = self.regions.list_regions(); + for region in regions { + if let Err(e) = region.stop().await { + logging::error!(e; "Failed to stop region {}", region.region_id); + } + } + + self.regions.clear(); + } } #[cfg(test)]