Skip to content

Commit

Permalink
feat: open region in background
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 27, 2024
1 parent c78043d commit f7fb14b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
10 changes: 9 additions & 1 deletion src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
/// Write ahead log.
///
/// All regions in the engine shares the same WAL instance.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Wal<S> {
/// The underlying log store.
store: Arc<S>,
Expand All @@ -62,6 +62,14 @@ impl<S> Wal<S> {
}
}

impl<S> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
}
}
}

impl<S: LogStore> Wal<S> {
/// Returns a writer to write to the WAL.
pub fn writer(&self) -> WalWriter<S> {
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await,
DdlRequest::Open(req) => {
self.handle_open_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
DdlRequest::Alter(req) => {
self.handle_alter_request(ddl.region_id, req, ddl.sender)
Expand Down
68 changes: 49 additions & 19 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,22 @@ use common_telemetry::info;
use object_store::util::join_path;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::{AffectedRows, RegionOpenRequest};
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;

use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
use crate::worker::handle_drop::remove_region_dir_once;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_open_request(
&mut self,
async fn check_and_cleanup_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
if self.regions.is_region_exists(region_id) {
return Ok(0);
}
request: &RegionOpenRequest,
) -> Result<()> {
let object_store = if let Some(storage_name) = request.options.get("storage") {
self.object_store_manager
.find(storage_name)
Expand All @@ -59,10 +57,27 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return RegionNotFoundSnafu { region_id }.fail();
}

Ok(())
}

pub(crate) async fn handle_open_request(
&mut self,
region_id: RegionId,
request: RegionOpenRequest,
sender: OptionOutputTx,
) {
if self.regions.is_region_exists(region_id) {
sender.send(Ok(0));
return;
}
if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
sender.send(Err(err));
return;
}
info!("Try to open region {}", region_id);

// Open region from specific region dir.
let region = RegionOpener::new(
let opener = match RegionOpener::new(
region_id,
&request.region_dir,
self.memtable_builder_provider.clone(),
Expand All @@ -71,18 +86,33 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.intermediate_manager.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
.parse_options(request.options)?
.cache(Some(self.cache_manager.clone()))
.open(&self.config, &self.wal)
.await?;

info!("Region {} is opened", region_id);
.parse_options(request.options)
{
Ok(opener) => opener.cache(Some(self.cache_manager.clone())),
Err(err) => {
sender.send(Err(err));
return;
}
};

REGION_COUNT.inc();
let regions = self.regions.clone();
let wal = self.wal.clone();
let config = self.config.clone();
common_runtime::spawn_bg(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
REGION_COUNT.inc();

// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));
// Insert the MitoRegion into the RegionMap.
regions.insert_region(Arc::new(region));

Ok(0)
sender.send(Ok(0));
}
Err(err) => {
sender.send(Err(err));
}
}
});
}
}

0 comments on commit f7fb14b

Please sign in to comment.