Skip to content

Commit

Permalink
feat: wait for the opening region
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 28, 2024
1 parent f0ed8e2 commit 6e1b526
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;

use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::error::{InternalSnafu, InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
Expand Down Expand Up @@ -249,7 +249,7 @@ impl EngineInner {
let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?;
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)?
receiver.await.context(RecvSnafu)?.context(InternalSnafu)
}

/// Handles the scan `request` and returns a [ScanRegion].
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ pub enum Error {
location: Location,
},

#[snafu(display(""))]
Internal {
#[snafu(implicit)]
location: Location,
source: Arc<Error>,
},

#[snafu(display("Failed to stop scheduler"))]
StopScheduler {
#[snafu(source)]
Expand Down Expand Up @@ -725,6 +732,7 @@ pub enum Error {
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type ArcResult<T, E = Error> = std::result::Result<T, Arc<E>>;

impl Error {
/// Returns true if we need to fill default value for a region.
Expand Down Expand Up @@ -783,6 +791,7 @@ impl ErrorExt for Error {
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
Internal { source, .. } => source.status_code(),
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
Expand Down
33 changes: 22 additions & 11 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

Expand All @@ -35,7 +36,7 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;

Expand Down Expand Up @@ -474,26 +475,36 @@ pub(crate) type RegionMapRef = Arc<RegionMap>;
/// Opening regions
#[derive(Debug, Default)]
pub(crate) struct OpeningRegions {
regions: RwLock<HashSet<RegionId>>,
regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
}

impl OpeningRegions {
/// Returns true if the region exists.
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains(&region_id)
/// Returns `None` if it's opening.
pub(crate) fn wait_for_existing_region_opening(
&self,
region_id: RegionId,
sender: OptionOutputTx,
) -> Option<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
match regions.entry(region_id) {
Entry::Occupied(mut senders) => {
senders.get_mut().push(sender);
None
}
Entry::Vacant(_) => Some(sender),
}
}

/// Inserts a new region into the map.
pub(crate) fn insert_region(&self, region: RegionId) {
pub(crate) fn insert_region(&self, region: RegionId, sender: OptionOutputTx) {
let mut regions = self.regions.write().unwrap();
regions.insert(region);
regions.insert(region, vec![sender]);
}

/// Remove region by id.
pub(crate) fn remove_region(&self, region_id: RegionId) {
pub(crate) fn remove_region(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id);
regions.remove(&region_id).unwrap_or_default()
}
}

Expand Down
27 changes: 20 additions & 7 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};

use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result,
ArcResult, CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error,
FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
Expand Down Expand Up @@ -384,16 +384,22 @@ pub(crate) fn validate_proto_value(

/// Oneshot output result sender.
#[derive(Debug)]
pub(crate) struct OutputTx(Sender<Result<AffectedRows>>);
pub(crate) struct OutputTx(Sender<ArcResult<AffectedRows, Error>>);

impl OutputTx {
/// Creates a new output sender.
pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
pub(crate) fn new(sender: Sender<ArcResult<AffectedRows, Error>>) -> OutputTx {
OutputTx(sender)
}

/// Sends the `result`.
pub(crate) fn send(self, result: Result<AffectedRows>) {
// Ignores send result.
let _ = self.0.send(result.map_err(Arc::new));
}

/// Sends the `result`.
pub(crate) fn send_arc(self, result: ArcResult<AffectedRows>) {
// Ignores send result.
let _ = self.0.send(result);
}
Expand Down Expand Up @@ -428,14 +434,21 @@ impl OptionOutputTx {
}
}

/// Sends the `result` and consumes the sender.
pub(crate) fn send_arc(mut self, result: ArcResult<AffectedRows>) {
if let Some(sender) = self.0.take() {
sender.send_arc(result);
}
}

/// Takes the inner sender.
pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
self.0.take()
}
}

impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
fn from(sender: Sender<Result<AffectedRows>>) -> Self {
impl From<Sender<ArcResult<AffectedRows>>> for OptionOutputTx {
fn from(sender: Sender<ArcResult<AffectedRows>>) -> Self {
Self::new(Some(OutputTx::new(sender)))
}
}
Expand Down Expand Up @@ -501,7 +514,7 @@ impl WorkerRequest {
pub(crate) fn try_from_region_request(
region_id: RegionId,
value: RegionRequest,
) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
) -> Result<(WorkerRequest, Receiver<ArcResult<AffectedRows>>)> {
let (sender, receiver) = oneshot::channel();
let worker_request = match value {
RegionRequest::Put(v) => {
Expand Down
25 changes: 17 additions & 8 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
request: RegionOpenRequest,
sender: OptionOutputTx,
) {
if self.opening_regions.is_region_exists(region_id) {
sender.send(Ok(0));
let Some(sender) = self
.opening_regions
.wait_for_existing_region_opening(region_id, sender)
else {
return;
};

if self.regions.is_region_exists(region_id) {
sender.send(Ok(0));
return;
Expand Down Expand Up @@ -101,26 +104,32 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};

let regions = self.regions.clone();
let wal = self.wal.clone();
let wal: crate::wal::Wal<S> = self.wal.clone();
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
opening_regions.insert_region(region_id);
opening_regions.insert_region(region_id, sender);
common_runtime::spawn_bg(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
REGION_COUNT.inc();

// Insert the MitoRegion into the RegionMap.
// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));

sender.send(Ok(0));
let senders = opening_regions.remove_region(region_id);
for sender in senders {
sender.send(Ok(0));
}
}
Err(err) => {
sender.send(Err(err));
let senders = opening_regions.remove_region(region_id);
let err = Arc::new(err);
for sender in senders {
sender.send_arc(Err(err.clone()));
}
}
}
opening_regions.remove_region(region_id);
});
}
}

0 comments on commit 6e1b526

Please sign in to comment.