diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index dd08865581f1..2eb0cf3d8504 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -13,7 +13,8 @@ // limitations under the License. mod buckets; -mod picker; +pub mod compactor; +pub mod picker; mod task; #[cfg(test)] mod test_util; @@ -31,7 +32,6 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion_common::ScalarValue; use datafusion_expr::Expr; -pub use picker::CompactionPickerRef; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -40,8 +40,9 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::compaction::twcs::TwcsPicker; -use crate::compaction::window::WindowedCompactionPicker; +use crate::compaction::compactor::{CompactionRegion, DefaultCompactor}; +use crate::compaction::picker::{new_picker, CompactionTask}; +use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, @@ -52,7 +53,6 @@ use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; use crate::read::BoxedBatchReader; -use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; @@ -90,17 +90,6 @@ impl CompactionRequest { } } -/// Builds compaction picker according to [CompactionOptions]. -pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef { - match strategy { - CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( - twcs_opts.max_active_window_files, - twcs_opts.max_inactive_window_files, - twcs_opts.time_window_seconds(), - )) as Arc<_>, - } -} - /// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, @@ -232,34 +221,13 @@ impl CompactionScheduler { request: CompactionRequest, options: compact_request::Options, ) -> Result<()> { - let picker = if let compact_request::Options::StrictWindow(window) = &options { - let window = if window.window_seconds == 0 { - None - } else { - Some(window.window_seconds) - }; - Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> - } else { - compaction_options_to_picker(&request.current_version.options.compaction) - }; - let region_id = request.region_id(); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, region_id - ); - - let pick_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick"]) - .start_timer(); - let Some(mut task) = picker.pick(request) else { + let Some(mut task) = self.build_compaction_task(request, options) else { // Nothing to compact, remove it from the region status map. self.region_status.remove(®ion_id); return Ok(()); }; - drop(pick_timer); - // Submit the compaction task. self.scheduler .schedule(Box::pin(async move { @@ -282,6 +250,70 @@ impl CompactionScheduler { // Notifies all pending tasks. status.on_failure(err); } + + fn build_compaction_task( + &self, + req: CompactionRequest, + options: compact_request::Options, + ) -> Option> { + let picker = new_picker(options, &req.current_version.options.compaction); + let region_id = req.region_id(); + let CompactionRequest { + engine_config, + current_version, + access_layer, + request_sender, + waiters, + start_time, + cache_manager, + manifest_ctx, + listener, + } = req; + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + + let compaction_region = CompactionRegion { + region_id, + current_version: current_version.clone(), + region_options: current_version.options.clone(), + engine_config: engine_config.clone(), + region_metadata: current_version.metadata.clone(), + cache_manager: cache_manager.clone(), + access_layer: access_layer.clone(), + manifest_ctx: manifest_ctx.clone(), + }; + + let picker_output = { + let _pick_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick"]) + .start_timer(); + picker.pick(&compaction_region) + }; + + let picker_output = if let Some(picker_output) = picker_output { + picker_output + } else { + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in waiters { + waiter.send(Ok(0)); + } + return None; + }; + + let task = CompactionTaskImpl { + request_sender, + waiters, + start_time, + listener, + picker_output, + compaction_region, + compactor: Arc::new(DefaultCompactor {}), + }; + + Some(Box::new(task)) + } } impl Drop for CompactionScheduler { @@ -395,8 +427,8 @@ impl CompactionStatus { } } -#[derive(Debug)] -pub(crate) struct CompactionOutput { +#[derive(Debug, Clone)] +pub struct CompactionOutput { pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs new file mode 100644 index 000000000000..a6694c8ef7bd --- /dev/null +++ b/src/mito2/src/compaction/compactor.rs @@ -0,0 +1,416 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::region::compact_request; +use common_telemetry::info; +use object_store::manager::ObjectStoreManager; +use smallvec::SmallVec; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; +use crate::cache::{CacheManager, CacheManagerRef}; +use crate::compaction::build_sst_reader; +use crate::compaction::picker::{new_picker, PickerOutput}; +use crate::config::MitoConfig; +use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::manifest::storage::manifest_compress_type; +use crate::memtable::time_partition::TimePartitions; +use crate::memtable::MemtableBuilderProvider; +use crate::read::Source; +use crate::region::opener::new_manifest_dir; +use crate::region::options::RegionOptions; +use crate::region::version::{VersionBuilder, VersionControl, VersionRef}; +use crate::region::ManifestContext; +use crate::region::RegionState::Writable; +use crate::schedule::scheduler::LocalScheduler; +use crate::sst::file::{FileMeta, IndexType}; +use crate::sst::file_purger::LocalFilePurger; +use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::parquet::WriteOptions; + +/// CompactionRegion represents a region that needs to be compacted. +/// It's the subset of MitoRegion. +#[derive(Clone)] +pub struct CompactionRegion { + pub region_id: RegionId, + pub region_options: RegionOptions, + + pub(crate) engine_config: Arc, + pub(crate) region_metadata: RegionMetadataRef, + pub(crate) cache_manager: CacheManagerRef, + pub(crate) access_layer: AccessLayerRef, + pub(crate) manifest_ctx: Arc, + pub(crate) current_version: VersionRef, +} + +/// CompactorRequest represents the request to compact a region. +#[derive(Debug, Clone)] +pub struct CompactorRequest { + pub region_id: RegionId, + pub region_dir: String, + pub region_options: HashMap, + pub compaction_options: compact_request::Options, + pub picker_output: PickerOutput, +} + +/// Open a compaction region from a compaction request. +/// It's simple version of RegionOpener::open(). +pub async fn open_compaction_region( + req: &CompactorRequest, + mito_config: &MitoConfig, + object_store_manager: ObjectStoreManager, +) -> Result { + let region_options = RegionOptions::try_from(&req.region_options)?; + let object_store = { + let name = ®ion_options.storage; + if let Some(name) = name { + object_store_manager + .find(name) + .context(ObjectStoreNotFoundSnafu { + object_store: name.to_string(), + })? + } else { + object_store_manager.default_object_store() + } + }; + + let access_layer = { + let intermediate_manager = + IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) + .await?; + + Arc::new(AccessLayer::new( + req.region_dir.as_str(), + object_store.clone(), + intermediate_manager, + )) + }; + + let manifest_manager = { + let region_manifest_options = RegionManifestOptions { + manifest_dir: new_manifest_dir(req.region_dir.as_str()), + object_store: object_store.clone(), + compress_type: manifest_compress_type(mito_config.compress_manifest), + checkpoint_distance: mito_config.manifest_checkpoint_distance, + }; + + RegionManifestManager::open(region_manifest_options, Default::default()) + .await? + .context(EmptyRegionDirSnafu { + region_id: req.region_id, + region_dir: req.region_dir.as_str(), + })? + }; + + let manifest = manifest_manager.manifest(); + let region_metadata = manifest.metadata.clone(); + let manifest_ctx = Arc::new(ManifestContext::new(manifest_manager, Writable)); + + let file_purger = { + let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs)); + Arc::new(LocalFilePurger::new( + purge_scheduler.clone(), + access_layer.clone(), + None, + )) + }; + + let current_version = { + let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone())) + .builder_for_options( + region_options.memtable.as_ref(), + !region_options.append_mode, + ); + + // Initial memtable id is 0. + let mutable = Arc::new(TimePartitions::new( + region_metadata.clone(), + memtable_builder.clone(), + 0, + region_options.compaction.time_window(), + )); + + let version = VersionBuilder::new(region_metadata.clone(), mutable) + .add_files(file_purger.clone(), manifest.files.values().cloned()) + .flushed_entry_id(manifest.flushed_entry_id) + .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) + .options(region_options.clone()) + .build(); + let version_control = Arc::new(VersionControl::new(version)); + version_control.current().version + }; + + Ok(CompactionRegion { + region_options: region_options.clone(), + manifest_ctx, + access_layer, + current_version, + region_id: req.region_id, + cache_manager: Arc::new(CacheManager::default()), + engine_config: Arc::new(mito_config.clone()), + region_metadata: region_metadata.clone(), + }) +} + +/// `[MergeOutput]` represents the output of merging SST files. +#[derive(Default, Clone, Debug)] +pub struct MergeOutput { + pub files_to_add: Vec, + pub files_to_remove: Vec, + pub compaction_time_window: Option, +} + +impl MergeOutput { + pub fn is_empty(&self) -> bool { + self.files_to_add.is_empty() && self.files_to_remove.is_empty() + } +} + +/// Compactor is the trait that defines the compaction logic. +#[async_trait::async_trait] +pub trait Compactor: Send + Sync + 'static { + /// Merge SST files for a region. + async fn merge_ssts( + &self, + compaction_region: &CompactionRegion, + picker_output: PickerOutput, + ) -> Result; + + /// Update the manifest after merging SST files. + async fn update_manifest( + &self, + compaction_region: &CompactionRegion, + merge_output: MergeOutput, + ) -> Result; + + /// Execute compaction for a region. + async fn compact( + &self, + compaction_region: &CompactionRegion, + compact_request_options: compact_request::Options, + ) -> Result<()>; +} + +/// DefaultCompactor is the default implementation of Compactor. +pub struct DefaultCompactor; + +#[async_trait::async_trait] +impl Compactor for DefaultCompactor { + async fn merge_ssts( + &self, + compaction_region: &CompactionRegion, + mut picker_output: PickerOutput, + ) -> Result { + let mut futs = Vec::with_capacity(picker_output.outputs.len()); + let mut compacted_inputs = + Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + + for output in picker_output.outputs.drain(..) { + compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); + + info!( + "Compaction region {} output [{}]-> {}", + compaction_region.region_id, + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .collect::>() + .join(","), + output.output_file_id + ); + + let write_opts = WriteOptions { + write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + ..Default::default() + }; + let create_inverted_index = compaction_region + .engine_config + .inverted_index + .create_on_compaction + .auto(); + let mem_threshold_index_create = compaction_region + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); + let index_write_buffer_size = Some( + compaction_region + .engine_config + .inverted_index + .write_buffer_size + .as_bytes() as usize, + ); + + let region_metadata = compaction_region.region_metadata.clone(); + let sst_layer = compaction_region.access_layer.clone(); + let region_id = compaction_region.region_id; + let file_id = output.output_file_id; + let cache_manager = compaction_region.cache_manager.clone(); + let storage = compaction_region.region_options.storage.clone(); + let index_options = compaction_region + .current_version + .options + .index_options + .clone(); + let append_mode = compaction_region.current_version.options.append_mode; + futs.push(async move { + let reader = build_sst_reader( + region_metadata.clone(), + sst_layer.clone(), + Some(cache_manager.clone()), + &output.inputs, + append_mode, + output.filter_deleted, + output.output_time_range, + ) + .await?; + let file_meta_opt = sst_layer + .write_sst( + SstWriteRequest { + file_id, + metadata: region_metadata, + source: Source::Reader(reader), + cache_manager, + storage, + create_inverted_index, + mem_threshold_index_create, + index_write_buffer_size, + index_options, + }, + &write_opts, + ) + .await? + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, + }); + Ok(file_meta_opt) + }); + } + let mut output_files = Vec::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = + Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); + for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(common_runtime::spawn_bg(task)); + } + } + let metas = futures::future::try_join_all(task_chunk) + .await + .context(JoinSnafu)? + .into_iter() + .collect::>>()?; + output_files.extend(metas.into_iter().flatten()); + } + + let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); + inputs.extend( + picker_output + .expired_ssts + .iter() + .map(|f| f.meta_ref().clone()), + ); + + Ok(MergeOutput { + files_to_add: output_files, + files_to_remove: inputs, + compaction_time_window: Some(picker_output.time_window_size), + }) + } + + async fn update_manifest( + &self, + compaction_region: &CompactionRegion, + merge_output: MergeOutput, + ) -> Result { + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: merge_output.files_to_add, + files_to_remove: merge_output.files_to_remove, + compaction_time_window: merge_output + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), + flushed_entry_id: None, + flushed_sequence: None, + }; + + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. + compaction_region + .manifest_ctx + .update_manifest(Writable, action_list) + .await?; + + Ok(edit) + } + + // The default implementation of compact combines the merge_ssts and update_manifest functions. + // Note: It's local compaction and only used for testing purpose. + async fn compact( + &self, + compaction_region: &CompactionRegion, + compact_request_options: compact_request::Options, + ) -> Result<()> { + let picker_output = { + let picker_output = new_picker( + compact_request_options, + &compaction_region.region_options.compaction, + ) + .pick(compaction_region); + + if let Some(picker_output) = picker_output { + picker_output + } else { + info!( + "No files to compact for region_id: {}", + compaction_region.region_id + ); + return Ok(()); + } + }; + + let merge_output = self.merge_ssts(compaction_region, picker_output).await?; + if merge_output.is_empty() { + info!( + "No files to compact for region_id: {}", + compaction_region.region_id + ); + return Ok(()); + } + self.update_manifest(compaction_region, merge_output) + .await?; + + Ok(()) + } +} diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index d97229e6ac21..715e8effecdc 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -15,17 +15,55 @@ use std::fmt::Debug; use std::sync::Arc; -use crate::compaction::CompactionRequest; +use api::v1::region::compact_request; -pub type CompactionPickerRef = Arc; +use crate::compaction::compactor::CompactionRegion; +use crate::compaction::twcs::TwcsPicker; +use crate::compaction::window::WindowedCompactionPicker; +use crate::compaction::CompactionOutput; +use crate::region::options::CompactionOptions; +use crate::sst::file::FileHandle; #[async_trait::async_trait] -pub trait CompactionTask: Debug + Send + Sync + 'static { +pub(crate) trait CompactionTask: Debug + Send + Sync + 'static { async fn run(&mut self); } -/// Picker picks input SST files and builds the compaction task. +/// Picker picks input SST files for compaction. /// Different compaction strategy may implement different pickers. -pub trait Picker: Debug + Send + 'static { - fn pick(&self, req: CompactionRequest) -> Option>; +pub trait Picker: Debug + Send + Sync + 'static { + /// Picks input SST files for compaction. + fn pick(&self, compaction_region: &CompactionRegion) -> Option; +} + +/// PickerOutput is the output of a [`Picker`]. +/// It contains the outputs of the compaction and the expired SST files. +#[derive(Default, Clone, Debug)] +pub struct PickerOutput { + pub outputs: Vec, + pub expired_ssts: Vec, + pub time_window_size: i64, +} + +/// Create a new picker based on the compaction request options and compaction options. +pub fn new_picker( + compact_request_options: compact_request::Options, + compaction_options: &CompactionOptions, +) -> Arc { + if let compact_request::Options::StrictWindow(window) = &compact_request_options { + let window = if window.window_seconds == 0 { + None + } else { + Some(window.window_seconds) + }; + Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> + } else { + match compaction_options { + CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( + twcs_opts.max_active_window_files, + twcs_opts.max_inactive_window_files, + twcs_opts.time_window_seconds(), + )) as Arc<_>, + } + } } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index fe7e637a9ea6..c76595097753 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -14,71 +14,51 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use common_telemetry::{error, info}; -use smallvec::SmallVec; use snafu::ResultExt; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; use tokio::sync::mpsc; -use crate::access_layer::{AccessLayerRef, SstWriteRequest}; -use crate::cache::CacheManagerRef; -use crate::compaction::picker::CompactionTask; -use crate::compaction::{build_sst_reader, CompactionOutput}; -use crate::config::MitoConfig; +use crate::compaction::compactor::{CompactionRegion, Compactor}; +use crate::compaction::picker::{CompactionTask, PickerOutput}; use crate::error; use crate::error::CompactRegionSnafu; -use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::manifest::action::RegionEdit; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; -use crate::read::Source; -use crate::region::options::IndexOptions; -use crate::region::{ManifestContextRef, RegionState}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; -use crate::sst::file::{FileHandle, FileMeta, IndexType}; -use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; -const MAX_PARALLEL_COMPACTION: usize = 8; +/// Maximum number of compaction tasks in parallel. +pub const MAX_PARALLEL_COMPACTION: usize = 8; pub(crate) struct CompactionTaskImpl { - pub engine_config: Arc, - pub region_id: RegionId, - pub metadata: RegionMetadataRef, - pub sst_layer: AccessLayerRef, - pub outputs: Vec, - pub expired_ssts: Vec, - pub compaction_time_window: Option, + pub compaction_region: CompactionRegion, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, /// Senders that are used to notify waiters waiting for pending compaction tasks. pub waiters: Vec, /// Start time of compaction task pub start_time: Instant, - pub(crate) cache_manager: CacheManagerRef, - /// Target storage of the region. - pub(crate) storage: Option, - /// Index options of the region. - pub(crate) index_options: IndexOptions, - /// The region is using append mode. - pub(crate) append_mode: bool, - /// Manifest context. - pub(crate) manifest_ctx: ManifestContextRef, /// Event listener. pub(crate) listener: WorkerListener, + /// Compactor to handle compaction. + pub(crate) compactor: Arc, + /// Output of the picker. + pub(crate) picker_output: PickerOutput, } impl Debug for CompactionTaskImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) - .field("outputs", &self.outputs) - .field("expired_ssts", &self.expired_ssts) - .field("compaction_time_window", &self.compaction_time_window) - .field("append_mode", &self.append_mode) + .field("region_id", &self.compaction_region.region_id) + .field("picker_output", &self.picker_output) + .field( + "append_mode", + &self.compaction_region.region_options.append_mode, + ) .finish() } } @@ -91,174 +71,54 @@ impl Drop for CompactionTaskImpl { impl CompactionTaskImpl { fn mark_files_compacting(&self, compacting: bool) { - self.outputs + self.picker_output + .outputs .iter() - .flat_map(|o| o.inputs.iter()) - .for_each(|f| f.set_compacting(compacting)) - } - - /// Merges all SST files. - /// Returns `(output files, input files)`. - async fn merge_ssts(&mut self) -> error::Result<(Vec, Vec)> { - let mut futs = Vec::with_capacity(self.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); - - for output in self.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); - - info!( - "Compaction region {} output [{}]-> {}", - self.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - output.output_file_id - ); - - let write_opts = WriteOptions { - write_buffer_size: self.engine_config.sst_write_buffer_size, - ..Default::default() - }; - let create_inverted_index = self - .engine_config - .inverted_index - .create_on_compaction - .auto(); - let mem_threshold_index_create = self - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - self.engine_config - .inverted_index - .write_buffer_size - .as_bytes() as usize, - ); - - let metadata = self.metadata.clone(); - let sst_layer = self.sst_layer.clone(); - let region_id = self.region_id; - let file_id = output.output_file_id; - let cache_manager = self.cache_manager.clone(); - let storage = self.storage.clone(); - let index_options = self.index_options.clone(); - let append_mode = self.append_mode; - futs.push(async move { - let reader = build_sst_reader( - metadata.clone(), - sst_layer.clone(), - Some(cache_manager.clone()), - &output.inputs, - append_mode, - output.filter_deleted, - output.output_time_range, - ) - .await?; - let file_meta_opt = sst_layer - .write_sst( - SstWriteRequest { - file_id, - metadata, - source: Source::Reader(reader), - cache_manager, - storage, - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, - index_options, - }, - &write_opts, - ) - .await? - .map(|sst_info| FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, - }); - Ok(file_meta_opt) - }); - } - - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); - for _ in 0..MAX_PARALLEL_COMPACTION { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_bg(task)); - } - } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(error::JoinSnafu)? - .into_iter() - .collect::>>()?; - output_files.extend(metas.into_iter().flatten()); - } - - let inputs = compacted_inputs.into_iter().collect(); - Ok((output_files, inputs)) + .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); } async fn handle_compaction(&mut self) -> error::Result { self.mark_files_compacting(true); + let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) .start_timer(); - let (added, mut deleted) = match self.merge_ssts().await { + + let compaction_result = match self + .compactor + .merge_ssts(&self.compaction_region, self.picker_output.clone()) + .await + { Ok(v) => v, Err(e) => { - error!(e; "Failed to compact region: {}", self.region_id); + error!(e; "Failed to compact region: {}", self.compaction_region.region_id); merge_timer.stop_and_discard(); return Err(e); } }; - deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone())); let merge_time = merge_timer.stop_and_record(); + info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", - self.region_id, - deleted, - added, - self.compaction_time_window, + self.compaction_region.region_id, + compaction_result.files_to_remove, + compaction_result.files_to_add, + compaction_result.compaction_time_window, self.waiters.len(), merge_time, ); - self.listener.on_merge_ssts_finished(self.region_id).await; + self.listener + .on_merge_ssts_finished(self.compaction_region.region_id) + .await; let _manifest_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["write_manifest"]) .start_timer(); - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: added, - files_to_remove: deleted, - compaction_time_window: self - .compaction_time_window - .map(|seconds| Duration::from_secs(seconds as u64)), - flushed_entry_id: None, - flushed_sequence: None, - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - // We might leak files if we fail to update manifest. We can add a cleanup task to - // remove them later. - self.manifest_ctx - .update_manifest(RegionState::Writable, action_list) - .await?; - Ok(edit) + self.compactor + .update_manifest(&self.compaction_region, compaction_result) + .await } /// Handles compaction failure, notifies all waiters. @@ -266,7 +126,7 @@ impl CompactionTaskImpl { COMPACTION_FAILURE_COUNT.inc(); for waiter in self.waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { - region_id: self.region_id, + region_id: self.compaction_region.region_id, })); } } @@ -276,7 +136,7 @@ impl CompactionTaskImpl { if let Err(e) = self.request_sender.send(request).await { error!( "Failed to notify compaction job status for region {}, request: {:?}", - self.region_id, e.0 + self.compaction_region.region_id, e.0 ); } } @@ -287,25 +147,25 @@ impl CompactionTask for CompactionTaskImpl { async fn run(&mut self) { let notify = match self.handle_compaction().await { Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { - region_id: self.region_id, + region_id: self.compaction_region.region_id, senders: std::mem::take(&mut self.waiters), start_time: self.start_time, edit, }), Err(e) => { - error!(e; "Failed to compact region, region id: {}", self.region_id); + error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id); let err = Arc::new(e); // notify compaction waiters self.on_failure(err.clone()); BackgroundNotify::CompactionFailed(CompactionFailed { - region_id: self.region_id, + region_id: self.compaction_region.region_id, err, }) } }; self.send_to_worker(WorkerRequest::Background { - region_id: self.region_id, + region_id: self.compaction_region.region_id, notify, }) .await; diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index f8b79cab523a..bcb5e49c2dd9 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -22,9 +22,9 @@ use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; -use crate::compaction::picker::{CompactionTask, Picker}; -use crate::compaction::task::CompactionTaskImpl; -use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::compaction::compactor::CompactionRegion; +use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::sst::file::{overlaps, FileHandle, FileId}; use crate::sst::version::LevelMeta; @@ -110,25 +110,10 @@ impl TwcsPicker { } impl Picker for TwcsPicker { - fn pick(&self, req: CompactionRequest) -> Option> { - let CompactionRequest { - engine_config, - current_version, - access_layer, - request_sender, - waiters, - start_time, - cache_manager, - manifest_ctx, - listener, - .. - } = req; - - let region_metadata = current_version.metadata.clone(); - let region_id = region_metadata.region_id; - - let levels = current_version.ssts.levels(); - let ttl = current_version.options.ttl; + fn pick(&self, compaction_region: &CompactionRegion) -> Option { + let region_id = compaction_region.region_id; + let levels = compaction_region.current_version.ssts.levels(); + let ttl = compaction_region.current_version.options.ttl; let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); if !expired_ssts.is_empty() { info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); @@ -136,7 +121,8 @@ impl Picker for TwcsPicker { expired_ssts.iter().for_each(|f| f.set_compacting(true)); } - let compaction_time_window = current_version + let compaction_time_window = compaction_region + .current_version .compaction_time_window .map(|window| window.as_secs() as i64); let time_window_size = compaction_time_window @@ -157,31 +143,14 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. - for waiter in waiters { - waiter.send(Ok(0)); - } return None; } - let task = CompactionTaskImpl { - engine_config, - region_id, - metadata: region_metadata, - sst_layer: access_layer, + + Some(PickerOutput { outputs, expired_ssts, - compaction_time_window: Some(time_window_size), - request_sender, - waiters, - start_time, - cache_manager, - storage: current_version.options.storage.clone(), - index_options: current_version.options.index_options.clone(), - append_mode: current_version.options.append_mode, - manifest_ctx, - listener, - }; - Some(Box::new(task)) + time_window_size, + }) } } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index a2b3f066efc2..1683d28f9a9c 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -23,9 +23,9 @@ use common_time::Timestamp; use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; -use crate::compaction::picker::{CompactionTask, Picker}; -use crate::compaction::task::CompactionTaskImpl; -use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::compaction::compactor::CompactionRegion; +use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; @@ -101,42 +101,18 @@ impl WindowedCompactionPicker { } impl Picker for WindowedCompactionPicker { - fn pick(&self, req: CompactionRequest) -> Option> { - let region_id = req.region_id(); - let CompactionRequest { - engine_config, - current_version, - access_layer, - request_sender, - waiters, - start_time, - cache_manager, - manifest_ctx, - listener, - } = req; - - let (outputs, expired_ssts, time_window) = - self.pick_inner(region_id, ¤t_version, Timestamp::current_millis()); - - let task = CompactionTaskImpl { - engine_config: engine_config.clone(), - region_id, - metadata: current_version.metadata.clone().clone(), - sst_layer: access_layer.clone(), + fn pick(&self, compaction_region: &CompactionRegion) -> Option { + let (outputs, expired_ssts, time_window) = self.pick_inner( + compaction_region.current_version.metadata.region_id, + &compaction_region.current_version, + Timestamp::current_millis(), + ); + + Some(PickerOutput { outputs, expired_ssts, - compaction_time_window: Some(time_window), - request_sender, - waiters, - start_time, - cache_manager, - storage: current_version.options.storage.clone(), - index_options: current_version.options.index_options.clone(), - append_mode: current_version.options.append_mode, - manifest_ctx, - listener, - }; - Some(Box::new(task)) + time_window_size: time_window, + }) } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index acf65608cef8..cdd2416940ce 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -25,7 +25,7 @@ pub mod test_util; mod access_layer; mod cache; -mod compaction; +pub mod compaction; pub mod config; pub mod engine; pub mod error; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f046471cfc2b..e20a00d35ae1 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -530,6 +530,6 @@ where } /// Returns the directory to the manifest files. -fn new_manifest_dir(region_dir: &str) -> String { +pub(crate) fn new_manifest_dir(region_dir: &str) -> String { join_dir(region_dir, "manifest") }