Skip to content

Commit

Permalink
feat: wait for the opening region
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 28, 2024
1 parent f0ed8e2 commit 1a55292
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 21 deletions.
5 changes: 5 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}

/// Returns true if the specific region exists.
pub fn is_region_opening(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_opening(region_id)
}

/// Returns the region disk/memory usage information.
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
let region = self
Expand Down
86 changes: 86 additions & 0 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;

use crate::config::MitoConfig;
use crate::error;
use crate::test_util::{
build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};
Expand Down Expand Up @@ -319,3 +321,87 @@ async fn test_open_region_skip_wal_replay() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_open_region_wait_for_opening_region_ok() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
let (tx, rx) = oneshot::channel();
let opening_regions = worker.opening_regions().clone();
opening_regions.insert_sender(region_id, tx.into());
assert!(engine.is_region_opening(region_id));

let handle_open = tokio::spawn(async move {
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
});

// Wait for conditions
while opening_regions.sender_len(region_id) != 2 {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Ok(0));
}

assert_eq!(handle_open.await.unwrap().unwrap().affected_rows, 0);
assert_eq!(rx.await.unwrap().unwrap(), 0);
}

#[tokio::test]
async fn test_open_region_wait_for_opening_region_err() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-err");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
let (tx, rx) = oneshot::channel();
let opening_regions = worker.opening_regions().clone();
opening_regions.insert_sender(region_id, tx.into());
assert!(engine.is_region_opening(region_id));

let handle_open = tokio::spawn(async move {
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
});

// Wait for conditions
while opening_regions.sender_len(region_id) != 2 {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Err(error::RegionNotFoundSnafu { region_id }.build()));
}

assert_eq!(
handle_open.await.unwrap().unwrap_err().status_code(),
StatusCode::RegionNotFound
);
assert_eq!(
rx.await.unwrap().unwrap_err().status_code(),
StatusCode::RegionNotFound
);
}
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to open region"))]
OpenRegion {
#[snafu(implicit)]
location: Location,
source: Arc<Error>,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -783,6 +790,7 @@ impl ErrorExt for Error {
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
OpenRegion { source, .. } => source.status_code(),
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
Expand Down
43 changes: 35 additions & 8 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

Expand All @@ -35,7 +36,7 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;

Expand Down Expand Up @@ -474,26 +475,52 @@ pub(crate) type RegionMapRef = Arc<RegionMap>;
/// Opening regions
#[derive(Debug, Default)]
pub(crate) struct OpeningRegions {
regions: RwLock<HashSet<RegionId>>,
regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
}

impl OpeningRegions {
/// Registers `sender` for an opening region; Otherwise, it returns `None`.
pub(crate) fn wait_for_opening_region(
&self,
region_id: RegionId,
sender: OptionOutputTx,
) -> Option<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
match regions.entry(region_id) {
Entry::Occupied(mut senders) => {
senders.get_mut().push(sender);
None
}
Entry::Vacant(_) => Some(sender),
}
}

/// Returns true if the region exists.
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains(&region_id)
regions.contains_key(&region_id)
}

/// Inserts a new region into the map.
pub(crate) fn insert_region(&self, region: RegionId) {
pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
let mut regions = self.regions.write().unwrap();
regions.insert(region);
regions.insert(region, vec![sender]);
}

/// Remove region by id.
pub(crate) fn remove_region(&self, region_id: RegionId) {
pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id);
regions.remove(&region_id).unwrap_or_default()
}

#[cfg(test)]
pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
let regions = self.regions.read().unwrap();
if let Some(senders) = regions.get(&region_id) {
senders.len()
} else {
0
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,11 @@ pub(crate) fn validate_proto_value(

/// Oneshot output result sender.
#[derive(Debug)]
pub(crate) struct OutputTx(Sender<Result<AffectedRows>>);
pub(crate) struct OutputTx(Sender<Result<AffectedRows, Error>>);

impl OutputTx {
/// Creates a new output sender.
pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
pub(crate) fn new(sender: Sender<Result<AffectedRows, Error>>) -> OutputTx {
OutputTx(sender)
}

Expand Down
24 changes: 22 additions & 2 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ impl WorkerGroup {
self.worker(region_id).is_region_exists(region_id)
}

/// Returns true if the specific region is opening.
pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
self.worker(region_id).is_region_opening(region_id)
}

/// Returns region of specific `region_id`.
///
/// This method should not be public.
Expand All @@ -225,7 +230,7 @@ impl WorkerGroup {
}

/// Get worker for specific `region_id`.
fn worker(&self, region_id: RegionId) -> &RegionWorker {
pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
let index = region_id_to_index(region_id, self.workers.len());

&self.workers[index]
Expand Down Expand Up @@ -364,6 +369,7 @@ impl<S: LogStore> WorkerStarter<S> {
/// Starts a region worker and its background thread.
fn start(self) -> RegionWorker {
let regions = Arc::new(RegionMap::default());
let opening_regions = Arc::new(OpeningRegions::default());
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);

let running = Arc::new(AtomicBool::new(true));
Expand All @@ -373,7 +379,7 @@ impl<S: LogStore> WorkerStarter<S> {
config: self.config.clone(),
regions: regions.clone(),
dropping_regions: Arc::new(RegionMap::default()),
opening_regions: Arc::new(OpeningRegions::default()),
opening_regions: opening_regions.clone(),
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
Expand Down Expand Up @@ -410,6 +416,7 @@ impl<S: LogStore> WorkerStarter<S> {
RegionWorker {
id: self.id,
regions,
opening_regions,
sender,
handle: Mutex::new(Some(handle)),
running,
Expand All @@ -423,6 +430,8 @@ pub(crate) struct RegionWorker {
id: WorkerId,
/// Regions bound to the worker.
regions: RegionMapRef,
/// The opening regions.
opening_regions: OpeningRegionsRef,
/// Request sender.
sender: Sender<WorkerRequest>,
/// Handle to the worker thread.
Expand Down Expand Up @@ -482,10 +491,21 @@ impl RegionWorker {
self.regions.is_region_exists(region_id)
}

/// Returns true if the region is opening.
fn is_region_opening(&self, region_id: RegionId) -> bool {
self.opening_regions.is_region_exists(region_id)
}

/// Returns region of specific `region_id`.
fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.regions.get_region(region_id)
}

#[cfg(test)]
/// Returns the [OpeningRegionsRef].
pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
&self.opening_regions
}
}

impl Drop for RegionWorker {
Expand Down
29 changes: 20 additions & 9 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use store_api::logstore::LogStore;
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;

use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
};
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
Expand Down Expand Up @@ -66,10 +68,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
request: RegionOpenRequest,
sender: OptionOutputTx,
) {
if self.opening_regions.is_region_exists(region_id) {
sender.send(Ok(0));
let Some(sender) = self
.opening_regions
.wait_for_opening_region(region_id, sender)
else {
return;
};

if self.regions.is_region_exists(region_id) {
sender.send(Ok(0));
return;
Expand Down Expand Up @@ -101,26 +106,32 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};

let regions = self.regions.clone();
let wal = self.wal.clone();
let wal: crate::wal::Wal<S> = self.wal.clone();
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
opening_regions.insert_region(region_id);
opening_regions.insert_sender(region_id, sender);
common_runtime::spawn_bg(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
REGION_COUNT.inc();

// Insert the MitoRegion into the RegionMap.
// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));

sender.send(Ok(0));
let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Ok(0));
}
}
Err(err) => {
sender.send(Err(err));
let senders = opening_regions.remove_sender(region_id);
let err = Arc::new(err);
for sender in senders {
sender.send(Err(err.clone()).context(OpenRegionSnafu));
}
}
}
opening_regions.remove_region(region_id);
});
}
}

0 comments on commit 1a55292

Please sign in to comment.