diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index a36493f300fb..0b3b8282833c 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -49,7 +49,7 @@ pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>; /// Write ahead log. /// /// All regions in the engine shares the same WAL instance. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Wal { /// The underlying log store. store: Arc, @@ -62,6 +62,14 @@ impl Wal { } } +impl Clone for Wal { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + } + } +} + impl Wal { /// Returns a writer to write to the WAL. pub fn writer(&self) -> WalWriter { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index ad243afdd73c..fbd6656e2e6c 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -698,7 +698,11 @@ impl RegionWorkerLoop { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, - DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await, + DdlRequest::Open(req) => { + self.handle_open_request(ddl.region_id, req, ddl.sender) + .await; + continue; + } DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, DdlRequest::Alter(req) => { self.handle_alter_request(ddl.region_id, req, ddl.sender) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index bcd050220e61..acc01b5edf26 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -20,24 +20,18 @@ use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::{AffectedRows, RegionOpenRequest}; +use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; +use crate::request::OptionOutputTx; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; impl RegionWorkerLoop { - pub(crate) async fn handle_open_request( - &mut self, - region_id: RegionId, - request: RegionOpenRequest, - ) -> Result { - if self.regions.is_region_exists(region_id) { - return Ok(0); - } + async fn check_region(&self, region_id: RegionId, request: &RegionOpenRequest) -> Result<()> { let object_store = if let Some(storage_name) = request.options.get("storage") { self.object_store_manager .find(storage_name) @@ -59,10 +53,27 @@ impl RegionWorkerLoop { return RegionNotFoundSnafu { region_id }.fail(); } + Ok(()) + } + + pub(crate) async fn handle_open_request( + &mut self, + region_id: RegionId, + request: RegionOpenRequest, + sender: OptionOutputTx, + ) { + if self.regions.is_region_exists(region_id) { + sender.send(Ok(0)); + return; + } + if let Err(err) = self.check_region(region_id, &request).await { + sender.send(Err(err)); + return; + } info!("Try to open region {}", region_id); // Open region from specific region dir. - let region = RegionOpener::new( + let opener = match RegionOpener::new( region_id, &request.region_dir, self.memtable_builder_provider.clone(), @@ -71,18 +82,33 @@ impl RegionWorkerLoop { self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) - .parse_options(request.options)? - .cache(Some(self.cache_manager.clone())) - .open(&self.config, &self.wal) - .await?; - - info!("Region {} is opened", region_id); - + .parse_options(request.options) + { + Ok(opener) => opener.cache(Some(self.cache_manager.clone())), + Err(err) => { + sender.send(Err(err)); + return; + } + }; REGION_COUNT.inc(); - // Insert the MitoRegion into the RegionMap. - self.regions.insert_region(Arc::new(region)); + let regions = self.regions.clone(); + let wal = self.wal.clone(); + let config = self.config.clone(); + common_runtime::spawn_bg(async move { + match opener.open(&config, &wal).await { + Ok(region) => { + info!("Region {} is opened", region_id); + + // Insert the MitoRegion into the RegionMap. + regions.insert_region(Arc::new(region)); - Ok(0) + sender.send(Ok(0)); + } + Err(err) => { + sender.send(Err(err)); + } + } + }); } }