From a7978e980acf50c55d67fa022a072291a94702c6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 12 Sep 2023 16:00:26 +0800 Subject: [PATCH] refactor: use get_region_or/writable_region_or --- src/mito2/src/region.rs | 18 +++++++++---- src/mito2/src/request.rs | 32 +++++++++++++++-------- src/mito2/src/worker/handle_alter.rs | 7 +++-- src/mito2/src/worker/handle_compaction.rs | 22 +++++----------- src/mito2/src/worker/handle_drop.rs | 2 +- src/mito2/src/worker/handle_flush.rs | 23 +++++----------- src/mito2/src/worker/handle_write.rs | 13 +++++---- 7 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index dbf27b1a5036..6040378b770d 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -136,7 +136,11 @@ impl RegionMap { } /// Gets region by region id or call the failure callback. - pub(crate) fn get_region_or_fail(&self, region_id: RegionId, cb: &mut F) -> Option { + pub(crate) fn get_region_or( + &self, + region_id: RegionId, + cb: &mut F, + ) -> Option { let region_opt = self.get_region(region_id); if region_opt.is_none() { cb.on_failure(RegionNotFoundSnafu { region_id }.build()); @@ -147,7 +151,7 @@ impl RegionMap { /// Gets writable region by region id. /// /// Returns error if the region does not exist or is readonly. - pub(crate) fn get_writable_region(&self, region_id: RegionId) -> Result { + pub(crate) fn writable_region(&self, region_id: RegionId) -> Result { let region = self .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; @@ -158,13 +162,17 @@ impl RegionMap { /// Gets writable region by region id. /// /// Calls the callback if the region does not exist or is readonly. - pub(crate) fn get_writable_region_or_fail(&self, region_id: RegionId, cb: &mut F) -> Option { - match self.get_writable_region(region_id) { + pub(crate) fn writable_region_or( + &self, + region_id: RegionId, + cb: &mut F, + ) -> Option { + match self.writable_region(region_id) { Ok(region) => Some(region), Err(e) => { cb.on_failure(e); None - }, + } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 1f635844b1da..3e432c966c8e 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -451,6 +451,12 @@ impl From>> for OptionOutputTx { } } +impl OnFailure for OptionOutputTx { + fn on_failure(&mut self, err: Error) { + self.send_mut(Err(err)); + } +} + /// Callback on failure. pub(crate) trait OnFailure { /// Handles `err` on failure. @@ -603,27 +609,29 @@ pub(crate) struct FlushFinished { } impl FlushFinished { - pub(crate) fn on_failure(self, err: Error) { - let err = Arc::new(err); + pub(crate) fn on_success(self) { for sender in self.senders { + sender.send(Ok(Output::AffectedRows(0))); + } + } +} + +impl OnFailure for FlushFinished { + fn on_failure(&mut self, err: Error) { + let err = Arc::new(err); + for sender in self.senders.drain(..) { sender.send(Err(err.clone()).context(FlushRegionSnafu { region_id: self.region_id, })); } // Clean flushed files. - for file in self.file_metas { + for file in &self.file_metas { self.file_purger.send_request(PurgeRequest { region_id: file.region_id, file_id: file.file_id, }); } } - - pub(crate) fn on_success(self) { - for sender in self.senders { - sender.send(Ok(Output::AffectedRows(0))); - } - } } /// Notifies a flush job is failed. @@ -653,11 +661,13 @@ impl CompactionFinished { self.sender.send(Ok(AffectedRows(0))); info!("Successfully compacted region: {}", self.region_id); } +} +impl OnFailure for CompactionFinished { /// Compaction succeeded but failed to update manifest or region's already been dropped, /// clean compaction output files. - pub fn on_failure(self, err: Error) { - self.sender.send(Err(err)); + fn on_failure(&mut self, err: Error) { + self.sender.send_mut(Err(err)); for file in &self.compacted_files { let file_id = file.file_id; warn!( diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 65220319b94d..27a48374ebd1 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -23,7 +23,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataR use store_api::region_request::RegionAlterRequest; use store_api::storage::RegionId; -use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result}; use crate::flush::FlushReason; use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; use crate::memtable::MemtableBuilderRef; @@ -37,10 +37,9 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionAlterRequest, - sender: OptionOutputTx, + mut sender: OptionOutputTx, ) { - let Some(region) = self.regions.get_region(region_id) else { - send_result(sender, RegionNotFoundSnafu { region_id }.fail()); + let Some(region) = self.regions.get_region_or(region_id, &mut sender) else { return; }; diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index d204817a5b87..aba0ba3d9625 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -19,22 +19,18 @@ use store_api::storage::RegionId; use crate::compaction::CompactionRequest; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; -use crate::request::{CompactionFailed, CompactionFinished, OptionOutputTx}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. pub(crate) fn handle_compaction_request( &mut self, region_id: RegionId, - sender: OptionOutputTx, + mut sender: OptionOutputTx, ) { - let region = match self.regions.get_writable_region(region_id) { - Ok(v) => v, - Err(e) => { - send_result(sender, Err(e)); - return; - } + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { + return; }; let request = self.new_compaction_request(®ion, sender); @@ -54,12 +50,8 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: CompactionFinished, ) { - let region = match self.regions.get_writable_region(region_id) { - Ok(v) => v, - Err(e) => { - request.on_failure(e); - return; - } + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { + return; }; // Write region edit to manifest. diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 017f5bd214f9..c8108082738f 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -35,7 +35,7 @@ const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { - let region = self.regions.get_writable_region(region_id)?; + let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 0fda9d03cc5d..6a54781ac44a 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -23,8 +23,8 @@ use crate::error::Result; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; -use crate::request::{FlushFailed, FlushFinished, OptionOutputTx}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// On region flush job finished. @@ -33,13 +33,8 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { - let region = match self.regions.get_writable_region(region_id) { - Ok(v) => v, - Err(e) => { - // We may dropped or closed the region. The region may be readonly. - request.on_failure(e); - return; - } + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { + return; }; // Write region edit to manifest. @@ -102,14 +97,10 @@ impl RegionWorkerLoop { pub(crate) async fn handle_flush_request( &mut self, region_id: RegionId, - sender: OptionOutputTx, + mut sender: OptionOutputTx, ) { - let region = match self.regions.get_writable_region(region_id) { - Ok(v) => v, - Err(e) => { - send_result(sender, Err(e)); - return; - } + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { + return; }; let mut task = self.new_flush_task(®ion, FlushReason::Manual); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 0f6b5f806960..3b9dad45aed8 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -102,13 +102,12 @@ impl RegionWorkerLoop { // Checks whether the region exists and is it stalling. if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { - let region = match self.regions.get_writable_region(region_id) { - Ok(v) => v, - Err(e) => { - // No such region or the region is read only. - send_result(sender_req.sender, Err(e)); - continue; - } + let Some(region) = self + .regions + .writable_region_or(region_id, &mut sender_req.sender) + else { + // No such region or the region is read only. + continue; }; let region_ctx = RegionWriteCtx::new(region.region_id, ®ion.version_control);