Skip to content

Commit

Permalink
feat: close region/engine and test
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jul 31, 2023
1 parent 7704fdd commit e94039c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
98 changes: 97 additions & 1 deletion src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::worker::request::RegionOptions;

#[tokio::test]
async fn test_engine_new_stop() {
let env = TestEnv::with_prefix("engine-stop");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();

// Stop the engine to reject further requests.
engine.stop().await.unwrap();
assert!(!engine.is_region_exists(region_id));

let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build();
let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build();
let err = engine.create_region(request).await.unwrap_err();
assert!(
matches!(err, Error::WorkerStopped { .. }),
Expand Down Expand Up @@ -74,3 +81,92 @@ async fn test_engine_create_existing_region() {
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_open_empty() {
let env = TestEnv::with_prefix("open-empty");
let engine = env.create_engine(MitoConfig::default()).await;

let err = engine
.open_region(OpenRequest {
region_id: RegionId::new(1, 1),
region_dir: "empty".to_string(),
options: RegionOptions::default(),
})
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionNotFound { .. }),
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_open_existing() {
let env = TestEnv::with_prefix("open-exiting");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();

engine
.open_region(OpenRequest {
region_id,
region_dir,
options: RegionOptions::default(),
})
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_close_region() {
let env = TestEnv::with_prefix("close");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
engine.create_region(request).await.unwrap();

engine
.close_region(CloseRequest { region_id })
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));

// It's okay to close this region again.
engine
.close_region(CloseRequest { region_id })
.await
.unwrap();
}

#[tokio::test]
async fn test_engine_reopen_region() {
let env = TestEnv::with_prefix("reopen-region");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new(region_id).build();
let region_dir = request.region_dir.clone();
engine.create_region(request).await.unwrap();

// Close the region.
engine
.close_region(CloseRequest { region_id })
.await
.unwrap();

// Open the region again.
engine
.open_region(OpenRequest {
region_id,
region_dir,
options: RegionOptions::default(),
})
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
}
11 changes: 11 additions & 0 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ impl RegionMap {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id);
}

/// List all regions.
pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
let regions = self.regions.read().unwrap();
regions.values().cloned().collect()
}

/// Clear the map.
pub(crate) fn clear(&self) {
self.regions.write().unwrap().clear();
}
}

pub(crate) type RegionMapRef = Arc<RegionMap>;
15 changes: 15 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ impl<S> RegionWorkerLoop<S> {
self.handle_requests(&mut buffer).await;
}

self.clean().await;

logging::info!("Exit region worker thread {}", self.id);
}

Expand Down Expand Up @@ -378,6 +380,19 @@ impl<S> RegionWorkerLoop<S> {
}
}
}

// Clean up the worker.
async fn clean(&self) {
// Closes remaining regions.
let regions = self.regions.list_regions();
for region in regions {
if let Err(e) = region.stop().await {
logging::error!(e; "Failed to stop region {}", region.region_id);
}
}

self.regions.clear();
}
}

#[cfg(test)]
Expand Down

0 comments on commit e94039c

Please sign in to comment.