diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 71006324d98d..b679365464f4 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,7 +18,7 @@ pub(crate) mod opener; pub mod options; pub(crate) mod version; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; @@ -471,6 +471,34 @@ impl RegionMap { pub(crate) type RegionMapRef = Arc; +/// Opening regions +#[derive(Debug, Default)] +pub(crate) struct OpeningRegions { + regions: RwLock>, +} + +impl OpeningRegions { + /// Returns true if the region exists. + pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool { + let regions = self.regions.read().unwrap(); + regions.contains(®ion_id) + } + + /// Inserts a new region into the map. + pub(crate) fn insert_region(&self, region: RegionId) { + let mut regions = self.regions.write().unwrap(); + regions.insert(region); + } + + /// Remove region by id. + pub(crate) fn remove_region(&self, region_id: RegionId) { + let mut regions = self.regions.write().unwrap(); + regions.remove(®ion_id); + } +} + +pub(crate) type OpeningRegionsRef = Arc; + #[cfg(test)] mod tests { use crossbeam_utils::atomic::AtomicCell; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index fbd6656e2e6c..522b3acbbabf 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -51,7 +51,7 @@ use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::WRITE_STALL_TOTAL; -use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; +use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; @@ -373,6 +373,7 @@ impl WorkerStarter { config: self.config.clone(), regions: regions.clone(), dropping_regions: Arc::new(RegionMap::default()), + opening_regions: Arc::new(OpeningRegions::default()), sender: sender.clone(), receiver, wal: Wal::new(self.log_store), @@ -531,6 +532,8 @@ struct RegionWorkerLoop { regions: RegionMapRef, /// Regions that are not yet fully dropped. dropping_regions: RegionMapRef, + /// Regions that are opening. + opening_regions: OpeningRegionsRef, /// Request sender. sender: Sender, /// Request receiver. diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 6c26d7d8bdd1..b8b7119abde9 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -66,6 +66,10 @@ impl RegionWorkerLoop { request: RegionOpenRequest, sender: OptionOutputTx, ) { + if self.opening_regions.is_region_exists(region_id) { + sender.send(Ok(0)); + return; + }; if self.regions.is_region_exists(region_id) { sender.send(Ok(0)); return; @@ -86,9 +90,10 @@ impl RegionWorkerLoop { self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) + .cache(Some(self.cache_manager.clone())) .parse_options(request.options) { - Ok(opener) => opener.cache(Some(self.cache_manager.clone())), + Ok(opener) => opener, Err(err) => { sender.send(Err(err)); return; @@ -98,6 +103,8 @@ impl RegionWorkerLoop { let regions = self.regions.clone(); let wal = self.wal.clone(); let config = self.config.clone(); + let opening_regions = self.opening_regions.clone(); + opening_regions.insert_region(region_id); common_runtime::spawn_bg(async move { match opener.open(&config, &wal).await { Ok(region) => { @@ -113,6 +120,7 @@ impl RegionWorkerLoop { sender.send(Err(err)); } } + opening_regions.remove_region(region_id); }); } }