Skip to content

Commit

Permalink
refactor: use get_region_or/writable_region_or
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 12, 2023
1 parent 1905dbf commit a7978e9
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 59 deletions.
18 changes: 13 additions & 5 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ impl RegionMap {
}

/// Gets region by region id or call the failure callback.
pub(crate) fn get_region_or_fail<F: OnFailure>(&self, region_id: RegionId, cb: &mut F) -> Option<MitoRegionRef> {
pub(crate) fn get_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
let region_opt = self.get_region(region_id);
if region_opt.is_none() {
cb.on_failure(RegionNotFoundSnafu { region_id }.build());
Expand All @@ -147,7 +151,7 @@ impl RegionMap {
/// Gets writable region by region id.
///
/// Returns error if the region does not exist or is readonly.
pub(crate) fn get_writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
Expand All @@ -158,13 +162,17 @@ impl RegionMap {
/// Gets writable region by region id.
///
/// Calls the callback if the region does not exist or is readonly.
pub(crate) fn get_writable_region_or_fail<F: OnFailure>(&self, region_id: RegionId, cb: &mut F) -> Option<MitoRegionRef> {
match self.get_writable_region(region_id) {
pub(crate) fn writable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.writable_region(region_id) {
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
},
}
}
}

Expand Down
32 changes: 21 additions & 11 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ impl From<Sender<Result<Output>>> for OptionOutputTx {
}
}

impl OnFailure for OptionOutputTx {
fn on_failure(&mut self, err: Error) {
self.send_mut(Err(err));
}
}

/// Callback on failure.
pub(crate) trait OnFailure {
/// Handles `err` on failure.
Expand Down Expand Up @@ -603,27 +609,29 @@ pub(crate) struct FlushFinished {
}

impl FlushFinished {
pub(crate) fn on_failure(self, err: Error) {
let err = Arc::new(err);
pub(crate) fn on_success(self) {
for sender in self.senders {
sender.send(Ok(Output::AffectedRows(0)));
}
}
}

impl OnFailure for FlushFinished {
fn on_failure(&mut self, err: Error) {
let err = Arc::new(err);
for sender in self.senders.drain(..) {
sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
// Clean flushed files.
for file in self.file_metas {
for file in &self.file_metas {
self.file_purger.send_request(PurgeRequest {
region_id: file.region_id,
file_id: file.file_id,
});
}
}

pub(crate) fn on_success(self) {
for sender in self.senders {
sender.send(Ok(Output::AffectedRows(0)));
}
}
}

/// Notifies a flush job is failed.
Expand Down Expand Up @@ -653,11 +661,13 @@ impl CompactionFinished {
self.sender.send(Ok(AffectedRows(0)));
info!("Successfully compacted region: {}", self.region_id);
}
}

impl OnFailure for CompactionFinished {
/// Compaction succeeded but failed to update manifest or region's already been dropped,
/// clean compaction output files.
pub fn on_failure(self, err: Error) {
self.sender.send(Err(err));
fn on_failure(&mut self, err: Error) {
self.sender.send_mut(Err(err));
for file in &self.compacted_files {
let file_id = file.file_id;
warn!(
Expand Down
7 changes: 3 additions & 4 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataR
use store_api::region_request::RegionAlterRequest;
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result};
use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableBuilderRef;
Expand All @@ -37,10 +37,9 @@ impl<S> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionAlterRequest,
sender: OptionOutputTx,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.get_region(region_id) else {
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
let Some(region) = self.regions.get_region_or(region_id, &mut sender) else {
return;
};

Expand Down
22 changes: 7 additions & 15 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@ use store_api::storage::RegionId;
use crate::compaction::CompactionRequest;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OptionOutputTx};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) fn handle_compaction_request(
&mut self,
region_id: RegionId,
sender: OptionOutputTx,
mut sender: OptionOutputTx,
) {
let region = match self.regions.get_writable_region(region_id) {
Ok(v) => v,
Err(e) => {
send_result(sender, Err(e));
return;
}
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};

let request = self.new_compaction_request(&region, sender);
Expand All @@ -54,12 +50,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
let region = match self.regions.get_writable_region(region_id) {
Ok(v) => v,
Err(e) => {
request.on_failure(e);
return;
}
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288)

impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result<Output> {
let region = self.regions.get_writable_region(region_id)?;
let region = self.regions.writable_region(region_id)?;

info!("Try to drop region: {}", region_id);

Expand Down
23 changes: 7 additions & 16 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::error::Result;
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished, OptionOutputTx};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
Expand All @@ -33,13 +33,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: FlushFinished,
) {
let region = match self.regions.get_writable_region(region_id) {
Ok(v) => v,
Err(e) => {
// We may dropped or closed the region. The region may be readonly.
request.on_failure(e);
return;
}
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
Expand Down Expand Up @@ -102,14 +97,10 @@ impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_flush_request(
&mut self,
region_id: RegionId,
sender: OptionOutputTx,
mut sender: OptionOutputTx,
) {
let region = match self.regions.get_writable_region(region_id) {
Ok(v) => v,
Err(e) => {
send_result(sender, Err(e));
return;
}
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};

let mut task = self.new_flush_task(&region, FlushReason::Manual);
Expand Down
13 changes: 6 additions & 7 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,12 @@ impl<S> RegionWorkerLoop<S> {

// Checks whether the region exists and is it stalling.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let region = match self.regions.get_writable_region(region_id) {
Ok(v) => v,
Err(e) => {
// No such region or the region is read only.
send_result(sender_req.sender, Err(e));
continue;
}
let Some(region) = self
.regions
.writable_region_or(region_id, &mut sender_req.sender)
else {
// No such region or the region is read only.
continue;
};

let region_ctx = RegionWriteCtx::new(region.region_id, &region.version_control);
Expand Down

0 comments on commit a7978e9

Please sign in to comment.