From 17e5cc75e4b5e41c917658a26ce7cedbdce8e870 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 14 Sep 2023 17:41:48 +0800 Subject: [PATCH 01/14] feat: allow multiple waiters in compaction request --- src/mito2/src/compaction.rs | 14 ++++++++++++-- src/mito2/src/compaction/twcs.rs | 23 +++++++++++++---------- src/mito2/src/flush.rs | 1 + src/mito2/src/request.rs | 18 +++++++++++++----- src/mito2/src/worker/handle_compaction.rs | 12 +++++------- 5 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 08b64432a7d8..978c93769b9c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -30,7 +30,7 @@ use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; use crate::error::Result; use crate::region::version::VersionRef; -use crate::request::{OptionOutputTx, WorkerRequest}; +use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; @@ -40,8 +40,10 @@ pub struct CompactionRequest { pub(crate) access_layer: AccessLayerRef, pub(crate) ttl: Option, pub(crate) compaction_time_window: Option, + /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, - pub(crate) waiter: OptionOutputTx, + /// Waiters of the compaction request. + pub(crate) waiters: Vec, pub(crate) file_purger: FilePurgerRef, } @@ -49,6 +51,13 @@ impl CompactionRequest { pub(crate) fn region_id(&self) -> RegionId { self.current_version.metadata.region_id } + + /// Push waiter to the request. + pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); + } + } } /// Builds compaction picker according to [CompactionStrategy]. @@ -62,6 +71,7 @@ pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> Compactio } } +/// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, // TODO(hl): maybe tracks region compaction status in CompactionScheduler diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 1b6293b9c82a..9812e2c00f5d 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -35,7 +35,7 @@ use crate::compaction::CompactionRequest; use crate::error; use crate::error::CompactRegionSnafu; use crate::request::{ - BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest, + BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; @@ -123,7 +123,7 @@ impl Picker for TwcsPicker { ttl, compaction_time_window, request_sender, - waiter, + waiters, file_purger, } = req; @@ -156,8 +156,10 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { - // Nothing to compact. - waiter.send(Ok(Output::AffectedRows(0))); + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in waiters { + waiter.send(Ok(Output::AffectedRows(0))); + } return None; } let task = TwcsCompactionTask { @@ -169,7 +171,7 @@ impl Picker for TwcsPicker { sst_write_buffer_size: ReadableSize::mb(4), compaction_time_window: None, request_sender, - sender: waiter, + waiters, file_purger, }; Some(Box::new(task)) @@ -227,8 +229,8 @@ pub(crate) struct TwcsCompactionTask { pub file_purger: FilePurgerRef, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, - /// Sender that are used to notify waiters waiting for pending compaction tasks. - pub sender: OptionOutputTx, + /// Senders that are used to notify waiters waiting for pending compaction tasks. + pub waiters: Vec, } impl Debug for TwcsCompactionTask { @@ -321,10 +323,11 @@ impl TwcsCompactionTask { /// Handles compaction failure, notifies all waiters. fn on_failure(&mut self, err: Arc) { - self.sender - .send_mut(Err(err.clone()).context(CompactRegionSnafu { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id: self.region_id, })); + } } /// Notifies region worker to handle post-compaction tasks. @@ -352,7 +355,7 @@ impl CompactionTask for TwcsCompactionTask { region_id: self.region_id, compaction_outputs: added, compacted_files: deleted, - sender: self.sender.take(), + senders: std::mem::take(&mut self.waiters), file_purger: self.file_purger.clone(), }) } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 1ae246ed208e..8d50e169e367 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -551,6 +551,7 @@ impl Drop for FlushScheduler { struct FlushStatus { /// Current region. region: MitoRegionRef, + // TODO(yingwen): We can remove this flag. /// There is a flush task running. flushing: bool, /// Task waiting for next flush. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f4b4352cc493..1fb24446909a 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -42,7 +42,8 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{ - CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result, + CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, + InvalidRequestSnafu, Result, }; use crate::memtable::MemtableId; use crate::sst::file::FileMeta; @@ -661,15 +662,17 @@ pub(crate) struct CompactionFinished { pub(crate) compaction_outputs: Vec, /// Compacted files that are to be removed from region version. pub(crate) compacted_files: Vec, - /// Compaction result sender. - pub(crate) sender: OptionOutputTx, + /// Compaction result senders. + pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, } impl CompactionFinished { pub fn on_success(self) { - self.sender.send(Ok(AffectedRows(0))); + for sender in self.senders { + sender.send(Ok(AffectedRows(0))); + } info!("Successfully compacted region: {}", self.region_id); } } @@ -678,7 +681,12 @@ impl OnFailure for CompactionFinished { /// Compaction succeeded but failed to update manifest or region's already been dropped, /// clean compaction output files. fn on_failure(&mut self, err: Error) { - self.sender.send_mut(Err(err)); + let err = Arc::new(err); + for sender in self.senders.drain(..) { + sender.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } for file in &self.compacted_files { let file_id = file.file_id; warn!( diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 101e9011ebad..2c9038ad0db9 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -33,7 +33,9 @@ impl RegionWorkerLoop { return; }; - let request = self.new_compaction_request(®ion, sender); + let mut request = self.new_compaction_request(®ion); + // Push waiter to the request. + request.push_waiter(sender); if let Err(e) = self.compaction_scheduler.schedule_compaction(request) { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { @@ -82,11 +84,7 @@ impl RegionWorkerLoop { } /// Creates a new compaction request. - fn new_compaction_request( - &self, - region: &MitoRegionRef, - waiter: OptionOutputTx, - ) -> CompactionRequest { + fn new_compaction_request(&self, region: &MitoRegionRef) -> CompactionRequest { let current_version = region.version_control.current().version; let access_layer = region.access_layer.clone(); let file_purger = region.file_purger.clone(); @@ -97,7 +95,7 @@ impl RegionWorkerLoop { ttl: None, // TODO(hl): get TTL info from region metadata compaction_time_window: None, // TODO(hl): get persisted region compaction time window request_sender: self.sender.clone(), - waiter, + waiters: Vec::new(), file_purger, } } From 16484a87aea7c52a7142595e5a50abf77b2507fe Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 14 Sep 2023 18:23:23 +0800 Subject: [PATCH 02/14] feat: compaction status wip --- src/mito2/src/compaction.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 978c93769b9c..7bdb67aa814c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -100,3 +100,15 @@ impl CompactionScheduler { })) } } + +/// Status of running and pending region compaction tasks. +struct CompactionStatus { + // Request waiting for compaction. + pending_request: Option, +} + +impl CompactionStatus { + fn merge_request(&mut self, request: CompactionRequest) { + unimplemented!() + } +} From 3b243bb2f1ec5945bf1f6a18989b54f60328b382 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 14 Sep 2023 20:45:29 +0800 Subject: [PATCH 03/14] feat: track region status in compaction scheduler --- src/mito2/src/compaction.rs | 95 ++++++++++++++++++++--- src/mito2/src/compaction/twcs.rs | 2 + src/mito2/src/flush.rs | 2 +- src/mito2/src/worker.rs | 2 +- src/mito2/src/worker/handle_compaction.rs | 27 +------ 5 files changed, 93 insertions(+), 35 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 7bdb67aa814c..c180a79a6c89 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -18,18 +18,20 @@ mod picker; mod test_util; mod twcs; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use common_telemetry::debug; pub use picker::CompactionPickerRef; use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; use crate::error::Result; use crate::region::version::VersionRef; +use crate::region::MitoRegionRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; @@ -74,16 +76,42 @@ pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> Compactio /// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, - // TODO(hl): maybe tracks region compaction status in CompactionScheduler + /// Regions need compaction. + region_status: HashMap, + /// Request sender of the worker that this scheduler belongs to. + request_sender: Sender, } impl CompactionScheduler { - pub(crate) fn new(scheduler: SchedulerRef) -> Self { - Self { scheduler } + pub(crate) fn new(scheduler: SchedulerRef, request_sender: Sender) -> Self { + Self { + scheduler, + region_status: HashMap::new(), + request_sender, + } + } + + /// Schedules a compaction for the region. + pub(crate) fn schedule_compaction( + &mut self, + region: &MitoRegionRef, + waiter: OptionOutputTx, + ) -> Result<()> { + if let Some(status) = self.region_status.get_mut(®ion.region_id) { + // Region is compacting or wait for compaction. Add the waiter to pending list. + status.merge_waiter(waiter); + return Ok(()); + } + + // The region can compact directly. + let req = self.new_compaction_request(region, waiter); + self.submit_request(req) } - /// Schedules a region compaction task. - pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> { + /// Submit a compaction request. + /// + /// Callers must ensure that the region doesn't have compaction task running. + fn submit_request(&self, req: CompactionRequest) -> Result<()> { self.scheduler.schedule(Box::pin(async { // TODO(hl): build picker according to region options. let picker = @@ -93,22 +121,69 @@ impl CompactionScheduler { picker, req.region_id() ); + // FIXME(yingwen): We should remove the region from region status. let Some(mut task) = picker.pick(req) else { return; }; task.run().await; })) } + + /// Creates a new compaction request for compaction picker. + fn new_compaction_request( + &self, + region: &MitoRegionRef, + waiter: OptionOutputTx, + ) -> CompactionRequest { + let current_version = region.version_control.current().version; + let access_layer = region.access_layer.clone(); + let file_purger = region.file_purger.clone(); + + let mut req = CompactionRequest { + current_version, + access_layer, + ttl: None, // TODO(hl): get TTL info from region metadata + compaction_time_window: None, // TODO(hl): get persisted region compaction time window + request_sender: self.request_sender.clone(), + waiters: Vec::new(), + file_purger, + }; + req.push_waiter(waiter); + + req + } +} + +/// Pending compaction tasks. +struct PendingCompaction { + waiters: Vec, +} + +impl PendingCompaction { + /// Push waiter to the request. + fn push_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); + } + } } /// Status of running and pending region compaction tasks. struct CompactionStatus { - // Request waiting for compaction. - pending_request: Option, + /// Compaction pending to schedule. + /// + /// For simplicity, we merge all pending compaction requests into one. + pending_compaction: Option, } impl CompactionStatus { - fn merge_request(&mut self, request: CompactionRequest) { - unimplemented!() + /// Merge the watier to the pending compaction. + fn merge_waiter(&mut self, waiter: OptionOutputTx) { + let pending = self + .pending_compaction + .get_or_insert_with(|| PendingCompaction { + waiters: Vec::new(), + }); + pending.push_waiter(waiter); } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 9812e2c00f5d..f02884381be5 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -156,6 +156,8 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { + // FIXME(yingwen): Need to remove the region from the scheduler. + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. for waiter in waiters { waiter.send(Ok(Output::AffectedRows(0))); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 8d50e169e367..419627c9b024 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -551,7 +551,7 @@ impl Drop for FlushScheduler { struct FlushStatus { /// Current region. region: MitoRegionRef, - // TODO(yingwen): We can remove this flag. + // TODO(yingwen): Maybe we can remove this flag. /// There is a flush task running. flushing: bool, /// Task waiting for next flush. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3cef393f7c87..2f72724fd8a1 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -251,7 +251,7 @@ impl WorkerStarter { scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), - compaction_scheduler: CompactionScheduler::new(self.scheduler), + compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()), stalled_requests: StalledRequests::default(), listener: self.listener, }; diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 2c9038ad0db9..0342412fbd97 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -16,9 +16,7 @@ use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::compaction::CompactionRequest; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -33,10 +31,10 @@ impl RegionWorkerLoop { return; }; - let mut request = self.new_compaction_request(®ion); - // Push waiter to the request. - request.push_waiter(sender); - if let Err(e) = self.compaction_scheduler.schedule_compaction(request) { + if let Err(e) = self + .compaction_scheduler + .schedule_compaction(®ion, sender) + { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { info!( @@ -82,21 +80,4 @@ impl RegionWorkerLoop { pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); } - - /// Creates a new compaction request. - fn new_compaction_request(&self, region: &MitoRegionRef) -> CompactionRequest { - let current_version = region.version_control.current().version; - let access_layer = region.access_layer.clone(); - let file_purger = region.file_purger.clone(); - - CompactionRequest { - current_version, - access_layer, - ttl: None, // TODO(hl): get TTL info from region metadata - compaction_time_window: None, // TODO(hl): get persisted region compaction time window - request_sender: self.sender.clone(), - waiters: Vec::new(), - file_purger, - } - } } From 23474f091d1fd71fc1487b66e56aebe0983699de Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 17:29:41 +0800 Subject: [PATCH 04/14] feat: impl compaction scheduler --- src/mito2/src/compaction.rs | 165 ++++++++++++++++++++++++++++++------ src/mito2/src/flush.rs | 7 +- 2 files changed, 143 insertions(+), 29 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index c180a79a6c89..2600b037d7e7 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -22,14 +22,15 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use common_telemetry::debug; +use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; +use snafu::ResultExt; use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; -use crate::error::Result; +use crate::error::{CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, Result}; use crate::region::version::VersionRef; use crate::region::MitoRegionRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; @@ -76,7 +77,7 @@ pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> Compactio /// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, - /// Regions need compaction. + /// Compacting regions. region_status: HashMap, /// Request sender of the worker that this scheduler belongs to. request_sender: Sender, @@ -97,63 +98,145 @@ impl CompactionScheduler { region: &MitoRegionRef, waiter: OptionOutputTx, ) -> Result<()> { - if let Some(status) = self.region_status.get_mut(®ion.region_id) { - // Region is compacting or wait for compaction. Add the waiter to pending list. + let status = self + .region_status + .entry(region.region_id) + .or_insert_with(|| CompactionStatus::new(region.clone())); + if status.compacting { + // Region is compacting. Add the waiter to pending list. status.merge_waiter(waiter); return Ok(()); } // The region can compact directly. - let req = self.new_compaction_request(region, waiter); + let mut req = Self::new_compaction_request(self.request_sender.clone(), region); + // Merge with all pending requests. + if let Some(pending) = status.pending_compaction.take() { + req.waiters = pending.waiters; + } + req.push_waiter(waiter); + + // Submit compaction request. self.submit_request(req) } + /// Notifies the scheduler that the compaction job is finished successfully. + pub(crate) fn on_compaction_success(&mut self, region_id: RegionId) { + let Some(status) = self.region_status.get_mut(®ion_id) else { + return; + }; + + if status.pending_compaction.is_none() { + // The region doesn't have pending compaction request, we can remove it. + self.region_status.remove(®ion_id); + return; + } + + let region = status.region.clone(); + // Try to schedule next compaction task for this region. + if let Err(e) = self.schedule_compaction(®ion, OptionOutputTx::none()) { + error!(e; "Failed to schedule next compaction for region {}", region_id); + } + } + + /// Notifies the scheduler that the compaction job is failed. + pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc) { + error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Fast fail: cancels all pending tasks and sends error to their waiters. + status.on_failure(err); + } + + /// Notifies the scheduler that the region is dropped. + pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build())); + } + + /// Notifies the scheduler that the region is closed. + pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + } + /// Submit a compaction request. /// /// Callers must ensure that the region doesn't have compaction task running. - fn submit_request(&self, req: CompactionRequest) -> Result<()> { - self.scheduler.schedule(Box::pin(async { - // TODO(hl): build picker according to region options. - let picker = - compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, - req.region_id() - ); - // FIXME(yingwen): We should remove the region from region status. - let Some(mut task) = picker.pick(req) else { - return; - }; - task.run().await; - })) + fn submit_request(&mut self, req: CompactionRequest) -> Result<()> { + let region_id = req.region_id(); + self.scheduler + .schedule(Box::pin(async { + // TODO(hl): build picker according to region options. + let picker = compaction_strategy_to_picker(&CompactionStrategy::Twcs( + TwcsOptions::default(), + )); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, + req.region_id() + ); + // FIXME(yingwen): We should remove the region from region status. + let Some(mut task) = picker.pick(req) else { + return; + }; + task.run().await; + })) + .map_err(|e| { + error!(e; "Failed to submit compaction request for region {}", region_id); + + // If failed to submit the job, we need to remove the region from the scheduler. + self.region_status.remove(®ion_id); + + e + }) } /// Creates a new compaction request for compaction picker. fn new_compaction_request( - &self, + request_sender: Sender, region: &MitoRegionRef, - waiter: OptionOutputTx, ) -> CompactionRequest { let current_version = region.version_control.current().version; let access_layer = region.access_layer.clone(); let file_purger = region.file_purger.clone(); - let mut req = CompactionRequest { + let req = CompactionRequest { current_version, access_layer, ttl: None, // TODO(hl): get TTL info from region metadata compaction_time_window: None, // TODO(hl): get persisted region compaction time window - request_sender: self.request_sender.clone(), + request_sender: request_sender.clone(), waiters: Vec::new(), file_purger, }; - req.push_waiter(waiter); req } } +impl Drop for CompactionScheduler { + fn drop(&mut self) { + for (region_id, status) in self.region_status.drain() { + // We are shutting down so notify all pending tasks. + status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + } + } +} + /// Pending compaction tasks. struct PendingCompaction { waiters: Vec, @@ -166,10 +249,23 @@ impl PendingCompaction { self.waiters.push(waiter); } } + + /// Send flush error to waiter. + fn on_failure(&mut self, region_id: RegionId, err: Arc) { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); + } + } } /// Status of running and pending region compaction tasks. struct CompactionStatus { + region: MitoRegionRef, + /// Whether a compaction task is running. + /// + /// It may be failed to submit a compaction task so we need this flag to track + /// whether the region is compacting. + compacting: bool, /// Compaction pending to schedule. /// /// For simplicity, we merge all pending compaction requests into one. @@ -177,6 +273,15 @@ struct CompactionStatus { } impl CompactionStatus { + /// Creates a new [CompactionStatus] + fn new(region: MitoRegionRef) -> CompactionStatus { + CompactionStatus { + region, + compacting: false, + pending_compaction: None, + } + } + /// Merge the watier to the pending compaction. fn merge_waiter(&mut self, waiter: OptionOutputTx) { let pending = self @@ -186,4 +291,10 @@ impl CompactionStatus { }); pending.push_waiter(waiter); } + + fn on_failure(self, err: Arc) { + if let Some(mut pending) = self.pending_compaction { + pending.on_failure(self.region.region_id, err.clone()); + } + } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 419627c9b024..c871d8ae9380 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -378,6 +378,7 @@ impl FlushScheduler { return Ok(()); } + // TODO(yingwen): We can merge with pending and execute directly. // If there are pending tasks, then we should push it to pending list. if flush_status.pending_task.is_some() { flush_status.merge_task(task); @@ -518,7 +519,7 @@ impl FlushScheduler { debug_assert!(self .region_status .values() - .all(|status| !status.flushing && status.pending_task.is_some())); + .all(|status| status.flushing || status.pending_task.is_some())); // Get the first region from status map. let Some(flush_status) = self @@ -551,8 +552,10 @@ impl Drop for FlushScheduler { struct FlushStatus { /// Current region. region: MitoRegionRef, - // TODO(yingwen): Maybe we can remove this flag. /// There is a flush task running. + /// + /// It is possible that a region is not flushing but has pending task if the scheduler + /// doesn't schedules this region. flushing: bool, /// Task waiting for next flush. pending_task: Option, From 66673fbcee0f99731b99c7e055216949194865d2 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 17:35:53 +0800 Subject: [PATCH 05/14] feat: call compaction scheduler --- src/mito2/src/compaction.rs | 2 +- src/mito2/src/worker/handle_close.rs | 2 ++ src/mito2/src/worker/handle_compaction.rs | 6 ++++++ src/mito2/src/worker/handle_drop.rs | 2 ++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 2600b037d7e7..414b65118df5 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -121,7 +121,7 @@ impl CompactionScheduler { } /// Notifies the scheduler that the compaction job is finished successfully. - pub(crate) fn on_compaction_success(&mut self, region_id: RegionId) { + pub(crate) fn on_compaction_finished(&mut self, region_id: RegionId) { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 03899dd59f4b..2d786dd9cb83 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -33,6 +33,8 @@ impl RegionWorkerLoop { self.regions.remove_region(region_id); // Clean flush status. self.flush_scheduler.on_region_closed(region_id); + // Clean compaction status. + self.compaction_scheduler.on_region_closed(region_id); info!("Region {} closed", region_id); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 0342412fbd97..394ba23055f0 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -74,10 +74,16 @@ impl RegionWorkerLoop { .version_control .apply_edit(edit, &[], region.file_purger.clone()); request.on_success(); + + // Schedule next compaction if necessary. + self.compaction_scheduler.on_compaction_finished(region_id); } /// When compaction fails, we simply log the error. pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); + + self.compaction_scheduler + .on_compaction_failed(req.region_id, req.err); } } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index cfb4aca89f41..f7aa5d15dc6a 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -52,6 +52,8 @@ impl RegionWorkerLoop { self.dropping_regions.insert_region(region.clone()); // Notifies flush scheduler. self.flush_scheduler.on_region_dropped(region_id); + // Notifies compaction scheduler. + self.compaction_scheduler.on_region_dropped(region_id); // mark region version as dropped region.version_control.mark_dropped(); From a93904001fa627366b2d8e5e059b2809509d1ca4 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 17:41:46 +0800 Subject: [PATCH 06/14] feat: remove status if nothing to compact --- src/mito2/src/compaction.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 414b65118df5..d6df7384956c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -178,21 +178,24 @@ impl CompactionScheduler { /// Callers must ensure that the region doesn't have compaction task running. fn submit_request(&mut self, req: CompactionRequest) -> Result<()> { let region_id = req.region_id(); + // TODO(hl): build picker according to region options. + let picker = + compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, + req.region_id() + ); + // TODO(yingwen): Picker should takes `&req` so we can notify waiters outside + // of picker. + let Some(mut task) = picker.pick(req) else { + // Nothing to compact, remove it from the region map. + self.region_status.remove(®ion_id); + return Ok(()); + }; + self.scheduler - .schedule(Box::pin(async { - // TODO(hl): build picker according to region options. - let picker = compaction_strategy_to_picker(&CompactionStrategy::Twcs( - TwcsOptions::default(), - )); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, - req.region_id() - ); - // FIXME(yingwen): We should remove the region from region status. - let Some(mut task) = picker.pick(req) else { - return; - }; + .schedule(Box::pin(async move { task.run().await; })) .map_err(|e| { From 93c6be902a48499a3cced66472b27569198a7c7c Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 17:45:14 +0800 Subject: [PATCH 07/14] feat: schedule compaction after flush --- src/mito2/src/compaction.rs | 2 ++ src/mito2/src/worker/handle_flush.rs | 13 ++++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d6df7384956c..761216e1a7b4 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -126,6 +126,8 @@ impl CompactionScheduler { return; }; + // TODO(yingwen): We should always try to compact the region until picker + // returns None. if status.pending_compaction.is_none() { // The region doesn't have pending compaction request, we can remove it. self.region_status.remove(®ion_id); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index cb2f2746dc99..531f2366d51c 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,7 +14,7 @@ //! Handling flush related requests. -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -198,6 +198,17 @@ impl RegionWorkerLoop { // We already stalled these requests, don't stall them again. self.handle_write_requests(stalled.requests, false).await; + // Schedules compaction. + if let Err(e) = self + .compaction_scheduler + .schedule_compaction(®ion, OptionOutputTx::none()) + { + warn!( + "Failed to schedule compaction after flush, region: {}, err: {}", + region.region_id, e + ); + } + self.listener.on_flush_success(region_id); } } From 6cb3a730e1f43453ecec283d7f225a1d7b6c2d5a Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 19:54:33 +0800 Subject: [PATCH 08/14] feat: set compacting to false after compaction finished --- src/mito2/src/compaction.rs | 65 ++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 761216e1a7b4..01c99e1f6f74 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -116,8 +116,33 @@ impl CompactionScheduler { } req.push_waiter(waiter); - // Submit compaction request. - self.submit_request(req) + // TODO(hl): build picker according to region options. + let picker = + compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + let region_id = req.region_id(); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + let Some(mut task) = picker.pick(req) else { + // Nothing to compact, remove it from the region map. + self.region_status.remove(®ion_id); + return Ok(()); + }; + + // Submit the compaction task. + self.scheduler + .schedule(Box::pin(async move { + task.run().await; + })) + .map_err(|e| { + error!(e; "Failed to submit compaction request for region {}", region_id); + + // If failed to submit the job, we need to remove the region from the scheduler. + self.region_status.remove(®ion_id); + + e + }) } /// Notifies the scheduler that the compaction job is finished successfully. @@ -125,6 +150,7 @@ impl CompactionScheduler { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; + status.compacting = false; // TODO(yingwen): We should always try to compact the region until picker // returns None. @@ -175,41 +201,6 @@ impl CompactionScheduler { status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); } - /// Submit a compaction request. - /// - /// Callers must ensure that the region doesn't have compaction task running. - fn submit_request(&mut self, req: CompactionRequest) -> Result<()> { - let region_id = req.region_id(); - // TODO(hl): build picker according to region options. - let picker = - compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, - req.region_id() - ); - // TODO(yingwen): Picker should takes `&req` so we can notify waiters outside - // of picker. - let Some(mut task) = picker.pick(req) else { - // Nothing to compact, remove it from the region map. - self.region_status.remove(®ion_id); - return Ok(()); - }; - - self.scheduler - .schedule(Box::pin(async move { - task.run().await; - })) - .map_err(|e| { - error!(e; "Failed to submit compaction request for region {}", region_id); - - // If failed to submit the job, we need to remove the region from the scheduler. - self.region_status.remove(®ion_id); - - e - }) - } - /// Creates a new compaction request for compaction picker. fn new_compaction_request( request_sender: Sender, From 7ea6e897058452affb34b87f7e823974e1a21474 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 20:53:06 +0800 Subject: [PATCH 09/14] refactor: flush status only needs region id and version control --- src/mito2/src/flush.rs | 48 +++++++++++++++------------- src/mito2/src/worker/handle_alter.rs | 5 ++- src/mito2/src/worker/handle_flush.rs | 17 ++++++++-- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c871d8ae9380..b3b17665b4c3 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -30,8 +30,7 @@ use crate::error::{ }; use crate::memtable::MemtableBuilderRef; use crate::read::Source; -use crate::region::version::{VersionControlData, VersionRef}; -use crate::region::MitoRegionRef; +use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -218,10 +217,10 @@ impl RegionFlushTask { /// Converts the flush task into a background job. /// /// We must call this in the region worker. - fn into_flush_job(mut self, region: &MitoRegionRef) -> Job { + fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job { // Get a version of this region before creating a job to get current // wal entry id, sequence and immutable memtables. - let version_data = region.version_control.current(); + let version_data = version_control.current(); Box::pin(async move { self.do_flush(version_data).await; @@ -353,14 +352,15 @@ impl FlushScheduler { /// Schedules a flush `task` for specific `region`. pub(crate) fn schedule_flush( &mut self, - region: &MitoRegionRef, + region_id: RegionId, + version_control: &VersionControlRef, task: RegionFlushTask, ) -> Result<()> { - debug_assert_eq!(region.region_id, task.region_id); + debug_assert_eq!(region_id, task.region_id); - let version = region.version_control.current().version; + let version = version_control.current().version; if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() { - debug_assert!(!self.region_status.contains_key(®ion.region_id)); + debug_assert!(!self.region_status.contains_key(®ion_id)); // The region has nothing to flush. task.on_success(); return Ok(()); @@ -369,8 +369,8 @@ impl FlushScheduler { // Add this region to status map. let flush_status = self .region_status - .entry(region.region_id) - .or_insert_with(|| FlushStatus::new(region.clone())); + .entry(region_id) + .or_insert_with(|| FlushStatus::new(region_id, version_control.clone())); // Checks whether we can flush the region now. if flush_status.flushing { // There is already a flush job running. @@ -386,18 +386,16 @@ impl FlushScheduler { } // Now we can flush the region directly. - region - .version_control - .freeze_mutable(&task.memtable_builder); + version_control.freeze_mutable(&task.memtable_builder); // Submit a flush job. - let job = task.into_flush_job(region); + let job = task.into_flush_job(version_control); if let Err(e) = self.scheduler.schedule(job) { // If scheduler returns error, senders in the job will be dropped and waiters // can get recv errors. - error!(e; "Failed to schedule flush job for region {}", region.region_id); + error!(e; "Failed to schedule flush job for region {}", region_id); // Remove from region status if we can't submit the task. - self.region_status.remove(®ion.region_id); + self.region_status.remove(®ion_id); return Err(e); } flush_status.flushing = true; @@ -531,9 +529,10 @@ impl FlushScheduler { }; debug_assert!(!flush_status.flushing); let task = flush_status.pending_task.take().unwrap(); - let region = flush_status.region.clone(); + let region_id = flush_status.region_id; + let version_control = flush_status.version_control.clone(); - self.schedule_flush(®ion, task) + self.schedule_flush(region_id, &version_control, task) } } @@ -551,7 +550,9 @@ impl Drop for FlushScheduler { /// Tracks running and pending flush tasks and all pending requests of a region. struct FlushStatus { /// Current region. - region: MitoRegionRef, + region_id: RegionId, + /// Version control of the region. + version_control: VersionControlRef, /// There is a flush task running. /// /// It is possible that a region is not flushing but has pending task if the scheduler @@ -566,9 +567,10 @@ struct FlushStatus { } impl FlushStatus { - fn new(region: MitoRegionRef) -> FlushStatus { + fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus { FlushStatus { - region, + region_id, + version_control, flushing: false, pending_task: None, pending_ddls: Vec::new(), @@ -591,14 +593,14 @@ impl FlushStatus { } for ddl in self.pending_ddls { ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu { - region_id: self.region.region_id, + region_id: self.region_id, })); } for write_req in self.pending_writes { write_req .sender .send(Err(err.clone()).context(FlushRegionSnafu { - region_id: self.region.region_id, + region_id: self.region_id, })); } } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 38813d324c5d..61cb343e8072 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -55,7 +55,10 @@ impl RegionWorkerLoop { // Try to submit a flush task. let task = self.new_flush_task(®ion, FlushReason::Alter); - if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { // Unable to flush the region, send error to waiter. sender.send(Err(e)); return; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 531f2366d51c..eee67dd0230f 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -39,7 +39,10 @@ impl RegionWorkerLoop { let mut task = self.new_flush_task(®ion, FlushReason::Manual); task.push_sender(sender); - if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { error!(e; "Failed to schedule flush task for region {}", region.region_id); } } @@ -90,7 +93,11 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. let task = self.new_flush_task(region, FlushReason::EngineFull); - self.flush_scheduler.schedule_flush(region, task)?; + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; } } @@ -99,7 +106,11 @@ impl RegionWorkerLoop { if let Some(region) = max_mem_region { if !self.flush_scheduler.is_flush_requested(region.region_id) { let task = self.new_flush_task(region, FlushReason::EngineFull); - self.flush_scheduler.schedule_flush(region, task)?; + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; } } From 8df1198cbea5d7561a0d0e49cc9b7a6ccbac9dbe Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 15 Sep 2023 21:21:32 +0800 Subject: [PATCH 10/14] refactor: schedule_compaction don't need region as argument --- src/mito2/src/compaction.rs | 168 ++++++++++++---------- src/mito2/src/worker/handle_compaction.rs | 11 +- src/mito2/src/worker/handle_flush.rs | 11 +- 3 files changed, 108 insertions(+), 82 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 01c99e1f6f74..391e773578ea 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -31,8 +31,7 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; use crate::error::{CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, Result}; -use crate::region::version::VersionRef; -use crate::region::MitoRegionRef; +use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; @@ -95,13 +94,20 @@ impl CompactionScheduler { /// Schedules a compaction for the region. pub(crate) fn schedule_compaction( &mut self, - region: &MitoRegionRef, + region_id: RegionId, + version_control: &VersionControlRef, + access_layer: &AccessLayerRef, + file_purger: &FilePurgerRef, waiter: OptionOutputTx, ) -> Result<()> { - let status = self - .region_status - .entry(region.region_id) - .or_insert_with(|| CompactionStatus::new(region.clone())); + let status = self.region_status.entry(region_id).or_insert_with(|| { + CompactionStatus::new( + region_id, + version_control.clone(), + access_layer.clone(), + file_purger.clone(), + ) + }); if status.compacting { // Region is compacting. Add the waiter to pending list. status.merge_waiter(waiter); @@ -109,40 +115,8 @@ impl CompactionScheduler { } // The region can compact directly. - let mut req = Self::new_compaction_request(self.request_sender.clone(), region); - // Merge with all pending requests. - if let Some(pending) = status.pending_compaction.take() { - req.waiters = pending.waiters; - } - req.push_waiter(waiter); - - // TODO(hl): build picker according to region options. - let picker = - compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); - let region_id = req.region_id(); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, region_id - ); - let Some(mut task) = picker.pick(req) else { - // Nothing to compact, remove it from the region map. - self.region_status.remove(®ion_id); - return Ok(()); - }; - - // Submit the compaction task. - self.scheduler - .schedule(Box::pin(async move { - task.run().await; - })) - .map_err(|e| { - error!(e; "Failed to submit compaction request for region {}", region_id); - - // If failed to submit the job, we need to remove the region from the scheduler. - self.region_status.remove(®ion_id); - - e - }) + let request = status.new_compaction_request(self.request_sender.clone(), waiter); + self.schedule_compaction_request(request) } /// Notifies the scheduler that the compaction job is finished successfully. @@ -151,18 +125,11 @@ impl CompactionScheduler { return; }; status.compacting = false; - - // TODO(yingwen): We should always try to compact the region until picker - // returns None. - if status.pending_compaction.is_none() { - // The region doesn't have pending compaction request, we can remove it. - self.region_status.remove(®ion_id); - return; - } - - let region = status.region.clone(); + // We should always try to compact the region until picker returns None. + let request = + status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none()); // Try to schedule next compaction task for this region. - if let Err(e) = self.schedule_compaction(®ion, OptionOutputTx::none()) { + if let Err(e) = self.schedule_compaction_request(request) { error!(e; "Failed to schedule next compaction for region {}", region_id); } } @@ -201,26 +168,37 @@ impl CompactionScheduler { status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); } - /// Creates a new compaction request for compaction picker. - fn new_compaction_request( - request_sender: Sender, - region: &MitoRegionRef, - ) -> CompactionRequest { - let current_version = region.version_control.current().version; - let access_layer = region.access_layer.clone(); - let file_purger = region.file_purger.clone(); - - let req = CompactionRequest { - current_version, - access_layer, - ttl: None, // TODO(hl): get TTL info from region metadata - compaction_time_window: None, // TODO(hl): get persisted region compaction time window - request_sender: request_sender.clone(), - waiters: Vec::new(), - file_purger, + /// Schedules a compaction request. + /// + /// If the region has nothing to compact, it removes the region from the status map. + fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> { + // TODO(hl): build picker according to region options. + let picker = + compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + let region_id = request.region_id(); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + let Some(mut task) = picker.pick(request) else { + // Nothing to compact, remove it from the region status map. + self.region_status.remove(®ion_id); + return Ok(()); }; - req + // Submit the compaction task. + self.scheduler + .schedule(Box::pin(async move { + task.run().await; + })) + .map_err(|e| { + error!(e; "Failed to submit compaction request for region {}", region_id); + + // If failed to submit the job, we need to remove the region from the scheduler. + self.region_status.remove(®ion_id); + + e + }) } } @@ -256,7 +234,14 @@ impl PendingCompaction { /// Status of running and pending region compaction tasks. struct CompactionStatus { - region: MitoRegionRef, + /// Id of the region. + region_id: RegionId, + /// Version control of the region. + version_control: VersionControlRef, + /// Access layer of the region. + access_layer: AccessLayerRef, + /// File purger of the region. + file_purger: FilePurgerRef, /// Whether a compaction task is running. /// /// It may be failed to submit a compaction task so we need this flag to track @@ -270,9 +255,17 @@ struct CompactionStatus { impl CompactionStatus { /// Creates a new [CompactionStatus] - fn new(region: MitoRegionRef) -> CompactionStatus { + fn new( + region_id: RegionId, + version_control: VersionControlRef, + access_layer: AccessLayerRef, + file_purger: FilePurgerRef, + ) -> CompactionStatus { CompactionStatus { - region, + region_id, + version_control, + access_layer, + file_purger, compacting: false, pending_compaction: None, } @@ -290,7 +283,34 @@ impl CompactionStatus { fn on_failure(self, err: Arc) { if let Some(mut pending) = self.pending_compaction { - pending.on_failure(self.region.region_id, err.clone()); + pending.on_failure(self.region_id, err.clone()); } } + + /// Creates a new compaction request for compaction picker. + /// + /// It consumes all pending compaction waiters. + fn new_compaction_request( + &mut self, + request_sender: Sender, + waiter: OptionOutputTx, + ) -> CompactionRequest { + let current_version = self.version_control.current().version; + let mut req = CompactionRequest { + current_version, + access_layer: self.access_layer.clone(), + ttl: None, // TODO(hl): get TTL info from region metadata + compaction_time_window: None, // TODO(hl): get persisted region compaction time window + request_sender: request_sender.clone(), + waiters: Vec::new(), + file_purger: self.file_purger.clone(), + }; + + if let Some(pending) = self.pending_compaction.take() { + req.waiters = pending.waiters; + } + req.push_waiter(waiter); + + req + } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 394ba23055f0..0e0f3a07ef34 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -31,10 +31,13 @@ impl RegionWorkerLoop { return; }; - if let Err(e) = self - .compaction_scheduler - .schedule_compaction(®ion, sender) - { + if let Err(e) = self.compaction_scheduler.schedule_compaction( + region.region_id, + ®ion.version_control, + ®ion.access_layer, + ®ion.file_purger, + sender, + ) { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { info!( diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index eee67dd0230f..28afc1dfc0c9 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -210,10 +210,13 @@ impl RegionWorkerLoop { self.handle_write_requests(stalled.requests, false).await; // Schedules compaction. - if let Err(e) = self - .compaction_scheduler - .schedule_compaction(®ion, OptionOutputTx::none()) - { + if let Err(e) = self.compaction_scheduler.schedule_compaction( + region.region_id, + ®ion.version_control, + ®ion.access_layer, + ®ion.file_purger, + OptionOutputTx::none(), + ) { warn!( "Failed to schedule compaction after flush, region: {}, err: {}", region.region_id, e From 26837a793bd3dc5a32a6f44187f952710fa7479d Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 16 Sep 2023 16:05:06 +0800 Subject: [PATCH 11/14] test: test flush/scheduler for empty requests --- src/mito2/src/compaction.rs | 56 ++++++++++- src/mito2/src/flush.rs | 32 ++++++ src/mito2/src/memtable.rs | 56 +---------- src/mito2/src/test_util.rs | 4 + src/mito2/src/test_util/memtable_util.rs | 81 +++++++++++++++ src/mito2/src/test_util/scheduler_util.rs | 77 +++++++++++++++ src/mito2/src/test_util/version_util.rs | 115 ++++++++++++++++++++++ 7 files changed, 363 insertions(+), 58 deletions(-) create mode 100644 src/mito2/src/test_util/memtable_util.rs create mode 100644 src/mito2/src/test_util/scheduler_util.rs create mode 100644 src/mito2/src/test_util/version_util.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 391e773578ea..0a2ec880d8a4 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -243,9 +243,6 @@ struct CompactionStatus { /// File purger of the region. file_purger: FilePurgerRef, /// Whether a compaction task is running. - /// - /// It may be failed to submit a compaction task so we need this flag to track - /// whether the region is compacting. compacting: bool, /// Compaction pending to schedule. /// @@ -314,3 +311,56 @@ impl CompactionStatus { req } } + +#[cfg(test)] +mod tests { + use common_query::Output; + use tokio::sync::oneshot; + + use super::*; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::version_util::VersionControlBuilder; + + #[tokio::test] + async fn test_schedule_empty() { + let env = SchedulerEnv::new(); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + + // Nothing to compact. + let version_control = Arc::new(builder.build()); + let (output_tx, output_rx) = oneshot::channel(); + let waiter = OptionOutputTx::from(output_tx); + scheduler + .schedule_compaction( + builder.region_id(), + &version_control, + &env.access_layer, + &purger, + waiter, + ) + .unwrap(); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); + + // Only one file, picker won't compact it. + let version_control = Arc::new(builder.push_l0_file(0, 1000).build()); + let (output_tx, output_rx) = oneshot::channel(); + let waiter = OptionOutputTx::from(output_tx); + scheduler + .schedule_compaction( + builder.region_id(), + &version_control, + &env.access_layer, + &purger, + waiter, + ) + .unwrap(); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); + } +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index b3b17665b4c3..a22280250dee 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -608,7 +608,11 @@ impl FlushStatus { #[cfg(test)] mod tests { + use tokio::sync::oneshot; + use super::*; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::version_util::VersionControlBuilder; #[test] fn test_get_mutable_limit() { @@ -662,4 +666,32 @@ mod tests { manager.reserve_mem(100); assert!(manager.should_flush_engine()); } + + #[tokio::test] + async fn test_schedule_empty() { + let env = SchedulerEnv::new(); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_flush_scheduler(); + let builder = VersionControlBuilder::new(); + + let version_control = Arc::new(builder.build()); + let (output_tx, output_rx) = oneshot::channel(); + let mut task = RegionFlushTask { + region_id: builder.region_id(), + reason: FlushReason::Others, + senders: Vec::new(), + request_sender: tx, + access_layer: env.access_layer.clone(), + memtable_builder: builder.memtable_builder(), + file_purger: builder.file_purger(), + }; + task.push_sender(OptionOutputTx::from(output_tx)); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + assert!(scheduler.region_status.is_empty()); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); + } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 82675e9ed3eb..d9c2f2e40b0c 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -20,7 +20,7 @@ pub mod key_values; pub(crate) mod version; use std::fmt; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use common_query::logical_plan::Expr; @@ -89,45 +89,6 @@ pub trait MemtableBuilder: Send + Sync + fmt::Debug { pub type MemtableBuilderRef = Arc; -// TODO(yingwen): Remove it once we port the memtable. -/// Empty memtable for test. -#[derive(Debug, Default)] -pub(crate) struct EmptyMemtable { - /// Id of this memtable. - id: MemtableId, -} - -impl EmptyMemtable { - /// Returns a new memtable with specific `id`. - pub(crate) fn new(id: MemtableId) -> EmptyMemtable { - EmptyMemtable { id } - } -} - -impl Memtable for EmptyMemtable { - fn id(&self) -> MemtableId { - self.id - } - - fn write(&self, _kvs: &KeyValues) -> Result<()> { - Ok(()) - } - - fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { - Box::new(std::iter::empty()) - } - - fn is_empty(&self) -> bool { - true - } - - fn mark_immutable(&self) {} - - fn stats(&self) -> MemtableStats { - MemtableStats::default() - } -} - /// Memtable memory allocation tracker. #[derive(Default)] pub struct AllocTracker { @@ -205,21 +166,6 @@ impl Drop for AllocTracker { } } -/// Default memtable builder. -#[derive(Debug, Default)] -pub(crate) struct DefaultMemtableBuilder { - /// Next memtable id. - next_id: AtomicU32, -} - -impl MemtableBuilder for DefaultMemtableBuilder { - fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(EmptyMemtable::new( - self.next_id.fetch_add(1, Ordering::Relaxed), - )) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 2f29c8aa6da0..43195da18f82 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,6 +14,10 @@ //! Utilities for testing. +pub mod memtable_util; +pub mod scheduler_util; +pub mod version_util; + use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs new file mode 100644 index 000000000000..4b6c4142e753 --- /dev/null +++ b/src/mito2/src/test_util/memtable_util.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtable test utilities. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use common_query::logical_plan::Expr; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; + +use crate::error::Result; +use crate::memtable::{ + BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, + MemtableStats, +}; + +/// Empty memtable for test. +#[derive(Debug, Default)] +pub(crate) struct EmptyMemtable { + /// Id of this memtable. + id: MemtableId, +} + +impl EmptyMemtable { + /// Returns a new memtable with specific `id`. + pub(crate) fn new(id: MemtableId) -> EmptyMemtable { + EmptyMemtable { id } + } +} + +impl Memtable for EmptyMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, _kvs: &KeyValues) -> Result<()> { + Ok(()) + } + + fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { + Box::new(std::iter::empty()) + } + + fn is_empty(&self) -> bool { + true + } + + fn mark_immutable(&self) {} + + fn stats(&self) -> MemtableStats { + MemtableStats::default() + } +} + +/// Empty memtable builder. +#[derive(Debug, Default)] +pub(crate) struct EmptyMemtableBuilder { + /// Next memtable id. + next_id: AtomicU32, +} + +impl MemtableBuilder for EmptyMemtableBuilder { + fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(EmptyMemtable::new( + self.next_id.fetch_add(1, Ordering::Relaxed), + )) + } +} diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs new file mode 100644 index 000000000000..5e1e259e75d9 --- /dev/null +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to mock flush and compaction schedulers. + +use std::sync::Arc; + +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use object_store::services::Fs; +use object_store::ObjectStore; +use tokio::sync::mpsc::Sender; + +use crate::access_layer::{AccessLayer, AccessLayerRef}; +use crate::compaction::CompactionScheduler; +use crate::flush::FlushScheduler; +use crate::request::WorkerRequest; +use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; + +/// Scheduler mocker. +pub(crate) struct SchedulerEnv { + #[allow(unused)] + path: TempDir, + /// Mock access layer for test. + pub(crate) access_layer: AccessLayerRef, + scheduler: Option, +} + +impl SchedulerEnv { + /// Creates a new mocker. + pub(crate) fn new() -> SchedulerEnv { + let path = create_temp_dir(""); + let mut builder = Fs::default(); + builder.root(path.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let access_layer = Arc::new(AccessLayer::new("", object_store.clone())); + + SchedulerEnv { + path: create_temp_dir(""), + access_layer, + scheduler: None, + } + } + + /// Creates a new compaction scheduler. + pub(crate) fn mock_compaction_scheduler( + &self, + request_sender: Sender, + ) -> CompactionScheduler { + let scheduler = self.get_scheduler(); + + CompactionScheduler::new(scheduler, request_sender) + } + + /// Creates a new flush scheduler. + pub(crate) fn mock_flush_scheduler(&self) -> FlushScheduler { + let scheduler = self.get_scheduler(); + + FlushScheduler::new(scheduler) + } + + fn get_scheduler(&self) -> SchedulerRef { + self.scheduler + .clone() + .unwrap_or_else(|| Arc::new(LocalScheduler::new(1))) + } +} diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs new file mode 100644 index 000000000000..2fee8a987b06 --- /dev/null +++ b/src/mito2/src/test_util/version_util.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to mock version. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; +use crate::region::version::{Version, VersionBuilder, VersionControl}; +use crate::sst::file::{FileId, FileMeta}; +use crate::sst::file_purger::FilePurgerRef; +use crate::test_util::memtable_util::EmptyMemtableBuilder; +use crate::test_util::new_noop_file_purger; + +fn new_region_metadata(region_id: RegionId) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(region_id); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() +} + +// Builder to mock a version control. +pub(crate) struct VersionControlBuilder { + metadata: RegionMetadata, + file_purger: FilePurgerRef, + memtable_builder: Arc, + files: HashMap, +} + +impl VersionControlBuilder { + pub(crate) fn new() -> VersionControlBuilder { + VersionControlBuilder { + metadata: new_region_metadata(RegionId::new(1, 1)), + file_purger: new_noop_file_purger(), + memtable_builder: Arc::new(EmptyMemtableBuilder::default()), + files: HashMap::new(), + } + } + + pub(crate) fn region_id(&self) -> RegionId { + self.metadata.region_id + } + + pub(crate) fn file_purger(&self) -> FilePurgerRef { + self.file_purger.clone() + } + + pub(crate) fn memtable_builder(&self) -> MemtableBuilderRef { + self.memtable_builder.clone() + } + + pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self { + let file_id = FileId::random(); + self.files.insert( + file_id, + FileMeta { + region_id: self.metadata.region_id, + file_id, + time_range: ( + Timestamp::new_millisecond(start_ms), + Timestamp::new_millisecond(end_ms), + ), + level: 0, + file_size: 0, // We don't care file size. + }, + ); + self + } + + pub(crate) fn build_version(&self) -> Version { + let metadata = Arc::new(self.metadata.clone()); + let mutable = self.memtable_builder.build(&metadata); + VersionBuilder::new(metadata, mutable) + .add_files(self.file_purger.clone(), self.files.values().cloned()) + .build() + } + + pub(crate) fn build(&self) -> VersionControl { + let version = self.build_version(); + VersionControl::new(version) + } +} From fe55ff7f8b55c620e76d8d4c129c7e98bcc5b4c5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 16 Sep 2023 16:23:56 +0800 Subject: [PATCH 12/14] test: trigger compaction in test --- src/mito2/src/compaction.rs | 2 ++ src/mito2/src/compaction/twcs.rs | 2 -- src/mito2/src/engine/compaction_test.rs | 16 +++++++++++----- src/mito2/src/read/scan_region.rs | 7 +++++++ src/mito2/src/read/seq_scan.rs | 5 +++++ 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0a2ec880d8a4..ffff9bebe4b8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -116,6 +116,8 @@ impl CompactionScheduler { // The region can compact directly. let request = status.new_compaction_request(self.request_sender.clone(), waiter); + // Mark the region as compacting. + status.compacting = true; self.schedule_compaction_request(request) } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index f02884381be5..9812e2c00f5d 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -156,8 +156,6 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { - // FIXME(yingwen): Need to remove the region from the scheduler. - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. for waiter in waiters { waiter.send(Ok(Output::AffectedRows(0))); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9c196d78b900..9a88ecbc4989 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -121,10 +121,12 @@ async fn test_compaction_region() { .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); + // Flush 5 SSTs for compaction. put_and_flush(&engine, region_id, &column_schemas, 0..10).await; put_and_flush(&engine, region_id, &column_schemas, 10..20).await; put_and_flush(&engine, region_id, &column_schemas, 20..30).await; - delete_and_flush(&engine, region_id, &column_schemas, 25..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let output = engine .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) @@ -132,10 +134,14 @@ async fn test_compaction_region() { .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); - let stream = engine - .handle_query(region_id, ScanRequest::default()) - .await - .unwrap(); + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 1, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 752821834e94..be8b318d83ff 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -59,6 +59,13 @@ impl Scanner { Scanner::Seq(seq_scan) => seq_scan.num_memtables(), } } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + match self { + Scanner::Seq(seq_scan) => seq_scan.file_ids(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 28e5249fecc8..148cb3777142 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -156,4 +156,9 @@ impl SeqScan { pub(crate) fn num_files(&self) -> usize { self.files.len() } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + self.files.iter().map(|file| file.file_id()).collect() + } } From b734dc8aa89ffafe71b322f85a51cf735a305461 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 16 Sep 2023 16:35:42 +0800 Subject: [PATCH 13/14] feat: notify scheduler on truncated --- src/mito2/src/compaction.rs | 39 ++++++++++++++++--------- src/mito2/src/error.rs | 6 ++-- src/mito2/src/flush.rs | 13 +++++---- src/mito2/src/worker/handle_flush.rs | 4 +-- src/mito2/src/worker/handle_truncate.rs | 5 ++-- 5 files changed, 40 insertions(+), 27 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index ffff9bebe4b8..9b1aa03130b0 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -30,7 +30,9 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; -use crate::error::{CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, Result}; +use crate::error::{ + CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, +}; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; @@ -150,24 +152,23 @@ impl CompactionScheduler { /// Notifies the scheduler that the region is dropped. pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { - // Remove this region. - let Some(status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Notifies all pending tasks. - status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build())); + self.remove_region_on_failure( + region_id, + Arc::new(RegionDroppedSnafu { region_id }.build()), + ); } /// Notifies the scheduler that the region is closed. pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { - // Remove this region. - let Some(status) = self.region_status.remove(®ion_id) else { - return; - }; + self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); + } - // Notifies all pending tasks. - status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + /// Notifies the scheduler that the region is truncated. + pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionTruncatedSnafu { region_id }.build()), + ); } /// Schedules a compaction request. @@ -202,6 +203,16 @@ impl CompactionScheduler { e }) } + + fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + status.on_failure(err); + } } impl Drop for CompactionScheduler { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 4925e76b607a..7986db4c8a81 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -402,8 +402,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Region {} is truncating, location: {}", region_id, location))] - RegionTruncating { + #[snafu(display("Region {} is truncated, location: {}", region_id, location))] + RegionTruncated { region_id: RegionId, location: Location, }, @@ -516,7 +516,7 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, - RegionTruncating { .. } => StatusCode::Cancelled, + RegionTruncated { .. } => StatusCode::Cancelled, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index a22280250dee..73c7d54b9a78 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -26,7 +26,7 @@ use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; use crate::error::{ - Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result, + Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; use crate::memtable::MemtableBuilderRef; use crate::read::Source; @@ -434,7 +434,7 @@ impl FlushScheduler { pending_requests } - /// Notifies the scheduler that the flush job is finished. + /// Notifies the scheduler that the flush job is failed. pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc) { error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); @@ -465,15 +465,15 @@ impl FlushScheduler { self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); } - /// Notifies the scheduler that the region is truncating. - pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) { + /// Notifies the scheduler that the region is truncated. + pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) { self.remove_region_on_failure( region_id, - Arc::new(RegionTruncatingSnafu { region_id }.build()), + Arc::new(RegionTruncatedSnafu { region_id }.build()), ); } - pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { // Remove this region. let Some(flush_status) = self.region_status.remove(®ion_id) else { return; @@ -684,6 +684,7 @@ mod tests { access_layer: env.access_layer.clone(), memtable_builder: builder.memtable_builder(), file_purger: builder.file_purger(), + listener: WorkerListener::default(), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 28afc1dfc0c9..2c1bad5cf544 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -19,7 +19,7 @@ use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::{RegionTruncatingSnafu, Result}; +use crate::error::{RegionTruncatedSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; @@ -152,7 +152,7 @@ impl RegionWorkerLoop { let version_data = region.version_control.current(); if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { if truncated_entry_id >= request.flushed_entry_id { - request.on_failure(RegionTruncatingSnafu { region_id }.build()); + request.on_failure(RegionTruncatedSnafu { region_id }.build()); return; } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 759f17b90cff..5b1e9db8a90d 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -44,8 +44,9 @@ impl RegionWorkerLoop { region.manifest_manager.update(action_list).await?; // Notifies flush scheduler. - self.flush_scheduler.on_region_truncating(region_id); - // TODO(DevilExileSu): Notifies compaction scheduler. + self.flush_scheduler.on_region_truncated(region_id); + // Notifies compaction scheduler. + self.compaction_scheduler.on_region_truncated(region_id); // Reset region's version and mark all SSTs deleted. region.version_control.truncate( From cb56ea8358a7f63749655fb0e236e1fd802980ee Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 17 Sep 2023 15:30:56 +0800 Subject: [PATCH 14/14] chore: Apply suggestions from code review Co-authored-by: JeremyHi --- src/mito2/src/compaction.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 9b1aa03130b0..8aff4414875d 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -309,8 +309,10 @@ impl CompactionStatus { let mut req = CompactionRequest { current_version, access_layer: self.access_layer.clone(), - ttl: None, // TODO(hl): get TTL info from region metadata - compaction_time_window: None, // TODO(hl): get persisted region compaction time window + // TODO(hl): get TTL info from region metadata + ttl: None, + // TODO(hl): get persisted region compaction time window + compaction_time_window: None, request_sender: request_sender.clone(), waiters: Vec::new(), file_purger: self.file_purger.clone(),