Skip to content

Commit

Permalink
feat: trace opening regions
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 27, 2024
1 parent f7fb14b commit f0ed8e2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
30 changes: 29 additions & 1 deletion src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -471,6 +471,34 @@ impl RegionMap {

pub(crate) type RegionMapRef = Arc<RegionMap>;

/// Opening regions
#[derive(Debug, Default)]
pub(crate) struct OpeningRegions {
regions: RwLock<HashSet<RegionId>>,
}

impl OpeningRegions {
/// Returns true if the region exists.
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains(&region_id)
}

/// Inserts a new region into the map.
pub(crate) fn insert_region(&self, region: RegionId) {
let mut regions = self.regions.write().unwrap();
regions.insert(region);
}

/// Remove region by id.
pub(crate) fn remove_region(&self, region_id: RegionId) {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id);
}
}

pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;

#[cfg(test)]
mod tests {
use crossbeam_utils::atomic::AtomicCell;
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
Expand Down Expand Up @@ -373,6 +373,7 @@ impl<S: LogStore> WorkerStarter<S> {
config: self.config.clone(),
regions: regions.clone(),
dropping_regions: Arc::new(RegionMap::default()),
opening_regions: Arc::new(OpeningRegions::default()),
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
Expand Down Expand Up @@ -531,6 +532,8 @@ struct RegionWorkerLoop<S> {
regions: RegionMapRef,
/// Regions that are not yet fully dropped.
dropping_regions: RegionMapRef,
/// Regions that are opening.
opening_regions: OpeningRegionsRef,
/// Request sender.
sender: Sender<WorkerRequest>,
/// Request receiver.
Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
request: RegionOpenRequest,
sender: OptionOutputTx,
) {
if self.opening_regions.is_region_exists(region_id) {
sender.send(Ok(0));
return;
};
if self.regions.is_region_exists(region_id) {
sender.send(Ok(0));
return;
Expand All @@ -86,9 +90,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.intermediate_manager.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.parse_options(request.options)
{
Ok(opener) => opener.cache(Some(self.cache_manager.clone())),
Ok(opener) => opener,
Err(err) => {
sender.send(Err(err));
return;
Expand All @@ -98,6 +103,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let regions = self.regions.clone();
let wal = self.wal.clone();
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
opening_regions.insert_region(region_id);
common_runtime::spawn_bg(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
Expand All @@ -113,6 +120,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender.send(Err(err));
}
}
opening_regions.remove_region(region_id);
});
}
}

0 comments on commit f0ed8e2

Please sign in to comment.