diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index b8f236c7ded2..469ed0a6ccf1 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -409,9 +409,7 @@ impl RegionServerInner { let engine = match region_change { RegionChange::Register(attribute) => match current_region_status { Some(status) => match status.clone() { - RegionEngineWithStatus::Registering(_) => { - return Ok(CurrentEngine::EarlyReturn(0)) - } + RegionEngineWithStatus::Registering(engine) => engine, RegionEngineWithStatus::Deregistering(_) => { return error::RegionBusySnafu { region_id }.fail() } @@ -781,34 +779,32 @@ mod tests { let mut mock_region_server = mock_region_server(); let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME); let engine_name = engine.name(); - mock_region_server.register_engine(engine.clone()); - let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); let create_req = builder.build(); - // Tries to create/open a registering region. mock_region_server.inner.region_map.insert( region_id, RegionEngineWithStatus::Registering(engine.clone()), ); - let response = mock_region_server .handle_request(region_id, RegionRequest::Create(create_req)) .await .unwrap(); assert_eq!(response.affected_rows, 0); - let status = mock_region_server .inner .region_map .get(®ion_id) .unwrap() .clone(); + assert!(matches!(status, RegionEngineWithStatus::Ready(_))); - assert!(matches!(status, RegionEngineWithStatus::Registering(_))); - + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); let response = mock_region_server .handle_request( region_id, @@ -822,14 +818,13 @@ mod tests { .await .unwrap(); assert_eq!(response.affected_rows, 0); - let status = mock_region_server .inner .region_map .get(®ion_id) .unwrap() .clone(); - assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + assert!(matches!(status, RegionEngineWithStatus::Ready(_))); } #[tokio::test] @@ -1020,7 +1015,7 @@ mod tests { region_change: RegionChange::Register(RegionAttribute::Mito), assert: Box::new(|result| { let current_engine = result.unwrap(); - assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + assert_matches!(current_engine, CurrentEngine::Engine(_)); }), }, CurrentEngineTest { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index e0223a5585ee..09bfe2535a12 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -104,6 +104,11 @@ impl MitoEngine { self.inner.workers.is_region_exists(region_id) } + /// Returns true if the specific region exists. + pub fn is_region_opening(&self, region_id: RegionId) -> bool { + self.inner.workers.is_region_opening(region_id) + } + /// Returns the region disk/memory usage information. pub async fn get_region_usage(&self, region_id: RegionId) -> Result { let region = self diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 3cf4a21e561a..c30c29648042 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -24,8 +24,10 @@ use store_api::region_request::{ RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::oneshot; use crate::config::MitoConfig; +use crate::error; use crate::test_util::{ build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, }; @@ -319,3 +321,87 @@ async fn test_open_region_skip_wal_replay() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_open_region_wait_for_opening_region_ok() { + let mut env = TestEnv::with_prefix("wait-for-opening-region-ok"); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let worker = engine.inner.workers.worker(region_id); + let (tx, rx) = oneshot::channel(); + let opening_regions = worker.opening_regions().clone(); + opening_regions.insert_sender(region_id, tx.into()); + assert!(engine.is_region_opening(region_id)); + + let handle_open = tokio::spawn(async move { + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: "empty".to_string(), + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + }); + + // Wait for conditions + while opening_regions.sender_len(region_id) != 2 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + + let senders = opening_regions.remove_sender(region_id); + for sender in senders { + sender.send(Ok(0)); + } + + assert_eq!(handle_open.await.unwrap().unwrap().affected_rows, 0); + assert_eq!(rx.await.unwrap().unwrap(), 0); +} + +#[tokio::test] +async fn test_open_region_wait_for_opening_region_err() { + let mut env = TestEnv::with_prefix("wait-for-opening-region-err"); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let worker = engine.inner.workers.worker(region_id); + let (tx, rx) = oneshot::channel(); + let opening_regions = worker.opening_regions().clone(); + opening_regions.insert_sender(region_id, tx.into()); + assert!(engine.is_region_opening(region_id)); + + let handle_open = tokio::spawn(async move { + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: "empty".to_string(), + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + }); + + // Wait for conditions + while opening_regions.sender_len(region_id) != 2 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + + let senders = opening_regions.remove_sender(region_id); + for sender in senders { + sender.send(Err(error::RegionNotFoundSnafu { region_id }.build())); + } + + assert_eq!( + handle_open.await.unwrap().unwrap_err().status_code(), + StatusCode::RegionNotFound + ); + assert_eq!( + rx.await.unwrap().unwrap_err().status_code(), + StatusCode::RegionNotFound + ); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 281ea7130c6d..400284fdf124 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -722,6 +722,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to open region"))] + OpenRegion { + #[snafu(implicit)] + location: Location, + source: Arc, + }, } pub type Result = std::result::Result; @@ -783,6 +790,7 @@ impl ErrorExt for Error { | Recv { .. } | EncodeWal { .. } | DecodeWal { .. } => StatusCode::Internal, + OpenRegion { source, .. } => source.status_code(), WriteBuffer { source, .. } => source.status_code(), WriteGroup { source, .. } => source.status_code(), FieldTypeMismatch { source, .. } => source.status_code(), diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 71006324d98d..c9930d2d04a7 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,6 +18,7 @@ pub(crate) mod opener; pub mod options; pub(crate) mod version; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; @@ -35,7 +36,7 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionControlRef, VersionRef}; -use crate::request::OnFailure; +use crate::request::{OnFailure, OptionOutputTx}; use crate::sst::file_purger::FilePurgerRef; use crate::time_provider::TimeProviderRef; @@ -471,6 +472,60 @@ impl RegionMap { pub(crate) type RegionMapRef = Arc; +/// Opening regions +#[derive(Debug, Default)] +pub(crate) struct OpeningRegions { + regions: RwLock>>, +} + +impl OpeningRegions { + /// Registers `sender` for an opening region; Otherwise, it returns `None`. + pub(crate) fn wait_for_opening_region( + &self, + region_id: RegionId, + sender: OptionOutputTx, + ) -> Option { + let mut regions = self.regions.write().unwrap(); + match regions.entry(region_id) { + Entry::Occupied(mut senders) => { + senders.get_mut().push(sender); + None + } + Entry::Vacant(_) => Some(sender), + } + } + + /// Returns true if the region exists. + pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool { + let regions = self.regions.read().unwrap(); + regions.contains_key(®ion_id) + } + + /// Inserts a new region into the map. + pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) { + let mut regions = self.regions.write().unwrap(); + regions.insert(region, vec![sender]); + } + + /// Remove region by id. + pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec { + let mut regions = self.regions.write().unwrap(); + regions.remove(®ion_id).unwrap_or_default() + } + + #[cfg(test)] + pub(crate) fn sender_len(&self, region_id: RegionId) -> usize { + let regions = self.regions.read().unwrap(); + if let Some(senders) = regions.get(®ion_id) { + senders.len() + } else { + 0 + } + } +} + +pub(crate) type OpeningRegionsRef = Arc; + #[cfg(test)] mod tests { use crossbeam_utils::atomic::AtomicCell; diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index a36493f300fb..0b3b8282833c 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -49,7 +49,7 @@ pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>; /// Write ahead log. /// /// All regions in the engine shares the same WAL instance. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Wal { /// The underlying log store. store: Arc, @@ -62,6 +62,14 @@ impl Wal { } } +impl Clone for Wal { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + } + } +} + impl Wal { /// Returns a writer to write to the WAL. pub fn writer(&self) -> WalWriter { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index ad243afdd73c..2122a052c839 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -51,7 +51,7 @@ use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::WRITE_STALL_TOTAL; -use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; +use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; @@ -212,6 +212,11 @@ impl WorkerGroup { self.worker(region_id).is_region_exists(region_id) } + /// Returns true if the specific region is opening. + pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool { + self.worker(region_id).is_region_opening(region_id) + } + /// Returns region of specific `region_id`. /// /// This method should not be public. @@ -225,7 +230,7 @@ impl WorkerGroup { } /// Get worker for specific `region_id`. - fn worker(&self, region_id: RegionId) -> &RegionWorker { + pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker { let index = region_id_to_index(region_id, self.workers.len()); &self.workers[index] @@ -364,6 +369,7 @@ impl WorkerStarter { /// Starts a region worker and its background thread. fn start(self) -> RegionWorker { let regions = Arc::new(RegionMap::default()); + let opening_regions = Arc::new(OpeningRegions::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); @@ -373,6 +379,7 @@ impl WorkerStarter { config: self.config.clone(), regions: regions.clone(), dropping_regions: Arc::new(RegionMap::default()), + opening_regions: opening_regions.clone(), sender: sender.clone(), receiver, wal: Wal::new(self.log_store), @@ -409,6 +416,7 @@ impl WorkerStarter { RegionWorker { id: self.id, regions, + opening_regions, sender, handle: Mutex::new(Some(handle)), running, @@ -422,6 +430,8 @@ pub(crate) struct RegionWorker { id: WorkerId, /// Regions bound to the worker. regions: RegionMapRef, + /// The opening regions. + opening_regions: OpeningRegionsRef, /// Request sender. sender: Sender, /// Handle to the worker thread. @@ -481,10 +491,21 @@ impl RegionWorker { self.regions.is_region_exists(region_id) } + /// Returns true if the region is opening. + fn is_region_opening(&self, region_id: RegionId) -> bool { + self.opening_regions.is_region_exists(region_id) + } + /// Returns region of specific `region_id`. fn get_region(&self, region_id: RegionId) -> Option { self.regions.get_region(region_id) } + + #[cfg(test)] + /// Returns the [OpeningRegionsRef]. + pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef { + &self.opening_regions + } } impl Drop for RegionWorker { @@ -531,6 +552,8 @@ struct RegionWorkerLoop { regions: RegionMapRef, /// Regions that are not yet fully dropped. dropping_regions: RegionMapRef, + /// Regions that are opening. + opening_regions: OpeningRegionsRef, /// Request sender. sender: Sender, /// Request receiver. @@ -698,7 +721,11 @@ impl RegionWorkerLoop { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, - DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await, + DdlRequest::Open(req) => { + self.handle_open_request(ddl.region_id, req, ddl.sender) + .await; + continue; + } DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, DdlRequest::Alter(req) => { self.handle_alter_request(ddl.region_id, req, ddl.sender) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index bcd050220e61..6c63abf1535c 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -20,24 +20,24 @@ use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::{AffectedRows, RegionOpenRequest}; +use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; -use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{ + ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result, +}; use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; +use crate::request::OptionOutputTx; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; impl RegionWorkerLoop { - pub(crate) async fn handle_open_request( - &mut self, + async fn check_and_cleanup_region( + &self, region_id: RegionId, - request: RegionOpenRequest, - ) -> Result { - if self.regions.is_region_exists(region_id) { - return Ok(0); - } + request: &RegionOpenRequest, + ) -> Result<()> { let object_store = if let Some(storage_name) = request.options.get("storage") { self.object_store_manager .find(storage_name) @@ -59,10 +59,33 @@ impl RegionWorkerLoop { return RegionNotFoundSnafu { region_id }.fail(); } + Ok(()) + } + + pub(crate) async fn handle_open_request( + &mut self, + region_id: RegionId, + request: RegionOpenRequest, + sender: OptionOutputTx, + ) { + if self.regions.is_region_exists(region_id) { + sender.send(Ok(0)); + return; + } + let Some(sender) = self + .opening_regions + .wait_for_opening_region(region_id, sender) + else { + return; + }; + if let Err(err) = self.check_and_cleanup_region(region_id, &request).await { + sender.send(Err(err)); + return; + } info!("Try to open region {}", region_id); // Open region from specific region dir. - let region = RegionOpener::new( + let opener = match RegionOpener::new( region_id, &request.region_dir, self.memtable_builder_provider.clone(), @@ -71,18 +94,43 @@ impl RegionWorkerLoop { self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) - .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) - .open(&self.config, &self.wal) - .await?; - - info!("Region {} is opened", region_id); + .parse_options(request.options) + { + Ok(opener) => opener, + Err(err) => { + sender.send(Err(err)); + return; + } + }; - REGION_COUNT.inc(); + let regions = self.regions.clone(); + let wal = self.wal.clone(); + let config = self.config.clone(); + let opening_regions = self.opening_regions.clone(); + opening_regions.insert_sender(region_id, sender); + common_runtime::spawn_bg(async move { + match opener.open(&config, &wal).await { + Ok(region) => { + info!("Region {} is opened", region_id); + REGION_COUNT.inc(); - // Insert the MitoRegion into the RegionMap. - self.regions.insert_region(Arc::new(region)); + // Insert the Region into the RegionMap. + regions.insert_region(Arc::new(region)); - Ok(0) + let senders = opening_regions.remove_sender(region_id); + for sender in senders { + sender.send(Ok(0)); + } + } + Err(err) => { + let senders = opening_regions.remove_sender(region_id); + let err = Arc::new(err); + for sender in senders { + sender.send(Err(err.clone()).context(OpenRegionSnafu)); + } + } + } + }); } }