From 0d5b79019397244297093fdeabb553be56eafc91 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Sun, 2 Jun 2024 16:47:56 +0800 Subject: [PATCH 01/31] refactor: add Compactor trait --- src/mito2/src/compaction.rs | 89 +++++---- src/mito2/src/compaction/compactor.rs | 278 ++++++++++++++++++++++++++ src/mito2/src/compaction/picker.rs | 42 +++- src/mito2/src/compaction/task.rs | 247 ++++++++--------------- src/mito2/src/compaction/twcs.rs | 52 +---- src/mito2/src/compaction/window.rs | 53 ++--- 6 files changed, 469 insertions(+), 292 deletions(-) create mode 100644 src/mito2/src/compaction/compactor.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index dfdb8a4aa97f..e118e05246c2 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -13,6 +13,7 @@ // limitations under the License. mod buckets; +pub mod compactor; mod picker; mod task; #[cfg(test)] @@ -31,7 +32,6 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion_common::ScalarValue; use datafusion_expr::Expr; -pub use picker::CompactionPickerRef; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -40,19 +40,18 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::compaction::twcs::TwcsPicker; -use crate::compaction::window::WindowedCompactionPicker; +use crate::compaction::compactor::DefaultCompactor; +use crate::compaction::picker::{new_picker, CompactionTask}; +use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, TimeRangePredicateOverflowSnafu, }; -use crate::metrics::COMPACTION_STAGE_ELAPSED; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; use crate::read::BoxedBatchReader; -use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; @@ -93,17 +92,6 @@ impl CompactionRequest { } } -/// Builds compaction picker according to [CompactionOptions]. -pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef { - match strategy { - CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( - twcs_opts.max_active_window_files, - twcs_opts.max_inactive_window_files, - twcs_opts.time_window_seconds(), - )) as Arc<_>, - } -} - /// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, @@ -240,34 +228,13 @@ impl CompactionScheduler { request: CompactionRequest, options: compact_request::Options, ) -> Result<()> { - let picker = if let compact_request::Options::StrictWindow(window) = &options { - let window = if window.window_seconds == 0 { - None - } else { - Some(window.window_seconds) - }; - Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> - } else { - compaction_options_to_picker(&request.current_version.options.compaction) - }; - let region_id = request.region_id(); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, region_id - ); - - let pick_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick"]) - .start_timer(); - let Some(mut task) = picker.pick(request) else { + let Some(mut task) = build_compaction_task(request, options) else { // Nothing to compact, remove it from the region status map. self.region_status.remove(®ion_id); return Ok(()); }; - drop(pick_timer); - // Submit the compaction task. self.scheduler .schedule(Box::pin(async move { @@ -409,7 +376,7 @@ impl CompactionStatus { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct CompactionOutput { pub output_file_id: FileId, /// Compaction output file level. @@ -530,6 +497,50 @@ fn get_expired_ssts( .collect() } +fn build_compaction_task( + req: CompactionRequest, + options: compact_request::Options, +) -> Option> { + let picker = new_picker(options, &req.current_version.options.compaction); + let region_id = req.region_id(); + let CompactionRequest { + engine_config, + current_version, + access_layer, + request_sender, + waiters, + file_purger, + start_time, + cache_manager, + manifest_ctx, + version_control, + listener, + } = req; + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + + let task = CompactionTaskImpl { + engine_config: engine_config.clone(), + region_id, + metadata: current_version.metadata.clone().clone(), + sst_layer: access_layer.clone(), + request_sender, + waiters, + file_purger, + start_time, + cache_manager, + append_mode: current_version.options.append_mode, + manifest_ctx, + version_control, + listener, + compactor: Arc::new(DefaultCompactor::new_with_picker(picker)), + picker_output: None, + }; + Some(Box::new(task)) +} + #[cfg(test)] mod tests { use std::sync::Mutex; diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs new file mode 100644 index 000000000000..062004d90cb7 --- /dev/null +++ b/src/mito2/src/compaction/compactor.rs @@ -0,0 +1,278 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_telemetry::info; +use smallvec::SmallVec; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::cache::CacheManagerRef; +use crate::compaction::build_sst_reader; +use crate::compaction::picker::{Picker, PickerOutput}; +use crate::config::MitoConfig; +use crate::error; +use crate::error::Result; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::read::Source; +use crate::region::options::RegionOptions; +use crate::region::version::VersionControlRef; +use crate::region::ManifestContext; +use crate::region::RegionState::Writable; +use crate::sst::file::{FileMeta, IndexType}; +use crate::sst::file_purger::FilePurgerRef; +use crate::sst::parquet::WriteOptions; + +#[derive(Clone)] +/// CompactionRegion represents a region that needs to be compacted. +pub struct CompactionRegion { + pub region_id: RegionId, + pub region_options: RegionOptions, + pub engine_config: Arc, + pub region_metadata: RegionMetadataRef, + pub manifest_ctx: Arc, + pub cache_manager: CacheManagerRef, + pub access_layer: AccessLayerRef, + pub version_control: VersionControlRef, + pub file_purger: FilePurgerRef, +} + +/// MergeOutput represents the output of merging SST files. +pub struct MergeOutput { + pub files_to_add: Option>, + pub fileds_to_remove: Option>, + pub compaction_time_window: Option, +} + +impl MergeOutput { + pub fn is_empty(&self) -> bool { + self.files_to_add.is_none() && self.fileds_to_remove.is_none() + } +} + +#[async_trait::async_trait] +pub trait Compactor: Send + Sync + 'static { + async fn merge_ssts( + &self, + compaction_region: CompactionRegion, + picker_output: Option, + ) -> Result { + if let Some(picker_output) = picker_output { + do_merge_ssts(compaction_region, picker_output).await + } else { + Ok(MergeOutput { + files_to_add: None, + fileds_to_remove: None, + compaction_time_window: None, + }) + } + } + + async fn update_manifest( + &self, + compaction_region: CompactionRegion, + compaction_result: MergeOutput, + ) -> Result<()> { + let files_to_add = { + if let Some(files) = compaction_result.files_to_add { + files + } else { + return Ok(()); + } + }; + + let files_to_remove = { + if let Some(files) = compaction_result.fileds_to_remove { + files + } else { + return Ok(()); + } + }; + + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add, + files_to_remove, + compaction_time_window: compaction_result + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), + flushed_entry_id: None, + flushed_sequence: None, + }; + + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + // We might leak files if we fail to update manifest. We can add a cleanup task to + // remove them later. + compaction_region + .manifest_ctx + .update_manifest(Writable, action_list, || { + compaction_region.version_control.apply_edit( + edit, + &[], + compaction_region.file_purger.clone(), + ); + }) + .await?; + Ok(()) + } + + fn picker(&self) -> Arc; +} + +async fn do_merge_ssts( + compaction_region: CompactionRegion, + mut picker_output: PickerOutput, +) -> Result { + let current_version = compaction_region.version_control.current().version; + let mut futs = Vec::with_capacity(picker_output.outputs.len()); + let mut compacted_inputs = + Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + for output in picker_output.outputs.drain(..) { + compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); + + info!( + "Compaction region {} output [{}]-> {}", + compaction_region.region_id, + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .collect::>() + .join(","), + output.output_file_id + ); + + let write_opts = WriteOptions { + write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + ..Default::default() + }; + let create_inverted_index = compaction_region + .engine_config + .inverted_index + .create_on_compaction + .auto(); + let mem_threshold_index_create = compaction_region + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); + let index_write_buffer_size = Some( + compaction_region + .engine_config + .inverted_index + .write_buffer_size + .as_bytes() as usize, + ); + + let region_metadata = compaction_region.region_metadata.clone(); + let sst_layer = compaction_region.access_layer.clone(); + let region_id = compaction_region.region_id; + let file_id = output.output_file_id; + let cache_manager = compaction_region.cache_manager.clone(); + let storage = compaction_region.region_options.storage.clone(); + let index_options = current_version.options.index_options.clone(); + let append_mode = current_version.options.append_mode; + futs.push(async move { + let reader = build_sst_reader( + region_metadata.clone(), + sst_layer.clone(), + Some(cache_manager.clone()), + &output.inputs, + append_mode, + output.filter_deleted, + output.output_time_range, + ) + .await?; + let file_meta_opt = sst_layer + .write_sst( + SstWriteRequest { + file_id, + metadata: region_metadata, + source: Source::Reader(reader), + cache_manager, + storage, + create_inverted_index, + mem_threshold_index_create, + index_write_buffer_size, + index_options, + }, + &write_opts, + ) + .await? + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, + }); + Ok(file_meta_opt) + }); + } + let mut output_files = Vec::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); + for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(common_runtime::spawn_bg(task)); + } + } + let metas = futures::future::try_join_all(task_chunk) + .await + .context(error::JoinSnafu)? + .into_iter() + .collect::>>()?; + output_files.extend(metas.into_iter().flatten()); + } + + let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); + inputs.extend( + picker_output + .expired_ssts + .iter() + .map(|f| f.meta_ref().clone()), + ); + + Ok(MergeOutput { + files_to_add: Some(output_files), + fileds_to_remove: Some(inputs), + compaction_time_window: Some(picker_output.time_window_size), + }) +} + +pub struct DefaultCompactor { + picker: Arc, +} + +#[async_trait::async_trait] +impl Compactor for DefaultCompactor { + fn picker(&self) -> Arc { + self.picker.clone() + } +} + +impl DefaultCompactor { + pub fn new_with_picker(picker: Arc) -> Self { + Self { picker } + } +} diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index d97229e6ac21..7501a56e694a 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -15,9 +15,14 @@ use std::fmt::Debug; use std::sync::Arc; -use crate::compaction::CompactionRequest; +use api::v1::region::compact_request; -pub type CompactionPickerRef = Arc; +use crate::compaction::twcs::TwcsPicker; +use crate::compaction::window::WindowedCompactionPicker; +use crate::compaction::CompactionOutput; +use crate::region::options::CompactionOptions; +use crate::region::version::VersionRef; +use crate::sst::file::FileHandle; #[async_trait::async_trait] pub trait CompactionTask: Debug + Send + Sync + 'static { @@ -26,6 +31,35 @@ pub trait CompactionTask: Debug + Send + Sync + 'static { /// Picker picks input SST files and builds the compaction task. /// Different compaction strategy may implement different pickers. -pub trait Picker: Debug + Send + 'static { - fn pick(&self, req: CompactionRequest) -> Option>; +pub trait Picker: Debug + Send + Sync + 'static { + fn pick(&self, current_version: VersionRef) -> Option; +} + +#[derive(Clone)] +pub struct PickerOutput { + pub outputs: Vec, + pub expired_ssts: Vec, + pub time_window_size: i64, +} + +pub fn new_picker( + compact_request_options: compact_request::Options, + compaction_options: &CompactionOptions, +) -> Arc { + if let compact_request::Options::StrictWindow(window) = &compact_request_options { + let window = if window.window_seconds == 0 { + None + } else { + Some(window.window_seconds) + }; + Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> + } else { + match compaction_options { + CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( + twcs_opts.max_active_window_files, + twcs_opts.max_inactive_window_files, + twcs_opts.time_window_seconds(), + )) as Arc<_>, + } + } } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index a675ceafb32a..9f9676fe1329 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -14,46 +14,37 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use common_telemetry::{error, info}; -use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; -use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::compaction::picker::CompactionTask; -use crate::compaction::{build_sst_reader, CompactionOutput}; +use crate::compaction::compactor::{CompactionRegion, Compactor}; +use crate::compaction::picker::{CompactionTask, PickerOutput}; use crate::config::MitoConfig; use crate::error; use crate::error::CompactRegionSnafu; -use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; -use crate::read::Source; -use crate::region::options::IndexOptions; use crate::region::version::VersionControlRef; -use crate::region::{ManifestContextRef, RegionState}; +use crate::region::ManifestContextRef; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; -use crate::sst::file::{FileHandle, FileMeta, IndexType}; use crate::sst::file_purger::FilePurgerRef; -use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; -const MAX_PARALLEL_COMPACTION: usize = 8; +pub const MAX_PARALLEL_COMPACTION: usize = 8; pub(crate) struct CompactionTaskImpl { pub engine_config: Arc, pub region_id: RegionId, pub metadata: RegionMetadataRef, pub sst_layer: AccessLayerRef, - pub outputs: Vec, - pub expired_ssts: Vec, - pub compaction_time_window: Option, pub file_purger: FilePurgerRef, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, @@ -62,10 +53,6 @@ pub(crate) struct CompactionTaskImpl { /// Start time of compaction task pub start_time: Instant, pub(crate) cache_manager: CacheManagerRef, - /// Target storage of the region. - pub(crate) storage: Option, - /// Index options of the region. - pub(crate) index_options: IndexOptions, /// The region is using append mode. pub(crate) append_mode: bool, /// Manifest context. @@ -74,17 +61,27 @@ pub(crate) struct CompactionTaskImpl { pub(crate) version_control: VersionControlRef, /// Event listener. pub(crate) listener: WorkerListener, + + pub(crate) compactor: Arc, + pub(crate) picker_output: Option, } impl Debug for CompactionTaskImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) - .field("outputs", &self.outputs) - .field("expired_ssts", &self.expired_ssts) - .field("compaction_time_window", &self.compaction_time_window) - .field("append_mode", &self.append_mode) - .finish() + if let Some(picker_output) = &self.picker_output { + f.debug_struct("TwcsCompactionTask") + .field("region_id", &self.region_id) + .field("outputs", &picker_output.outputs) + .field("expired_ssts", &picker_output.expired_ssts) + .field("compaction_time_window", &picker_output.time_window_size) + .field("append_mode", &self.append_mode) + .finish() + } else { + f.debug_struct("TwcsCompactionTask") + .field("region_id", &self.region_id) + .field("append_mode", &self.append_mode) + .finish() + } } } @@ -96,132 +93,56 @@ impl Drop for CompactionTaskImpl { impl CompactionTaskImpl { fn mark_files_compacting(&self, compacting: bool) { - self.outputs - .iter() - .flat_map(|o| o.inputs.iter()) - .for_each(|f| f.set_compacting(compacting)) - } - - /// Merges all SST files. - /// Returns `(output files, input files)`. - async fn merge_ssts(&mut self) -> error::Result<(Vec, Vec)> { - let mut futs = Vec::with_capacity(self.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); - - for output in self.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); - - info!( - "Compaction region {} output [{}]-> {}", - self.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - output.output_file_id - ); - - let write_opts = WriteOptions { - write_buffer_size: self.engine_config.sst_write_buffer_size, - ..Default::default() - }; - let create_inverted_index = self - .engine_config - .inverted_index - .create_on_compaction - .auto(); - let mem_threshold_index_create = self - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - self.engine_config - .inverted_index - .write_buffer_size - .as_bytes() as usize, - ); - - let metadata = self.metadata.clone(); - let sst_layer = self.sst_layer.clone(); - let region_id = self.region_id; - let file_id = output.output_file_id; - let cache_manager = self.cache_manager.clone(); - let storage = self.storage.clone(); - let index_options = self.index_options.clone(); - let append_mode = self.append_mode; - futs.push(async move { - let reader = build_sst_reader( - metadata.clone(), - sst_layer.clone(), - Some(cache_manager.clone()), - &output.inputs, - append_mode, - output.filter_deleted, - output.output_time_range, - ) - .await?; - let file_meta_opt = sst_layer - .write_sst( - SstWriteRequest { - file_id, - metadata, - source: Source::Reader(reader), - cache_manager, - storage, - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, - index_options, - }, - &write_opts, - ) - .await? - .map(|sst_info| FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, - }); - Ok(file_meta_opt) - }); + if let Some(picker_output) = &self.picker_output { + picker_output + .outputs + .iter() + .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); } + } - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); - for _ in 0..MAX_PARALLEL_COMPACTION { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_bg(task)); - } + async fn handle_compaction(&mut self) -> error::Result<()> { + let pick_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick"]) + .start_timer(); + let picker_output = self + .compactor + .picker() + .pick(self.version_control.current().version); + drop(pick_timer); + + if picker_output.is_none() { + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in self.waiters.drain(..) { + waiter.send(Ok(0)); } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(error::JoinSnafu)? - .into_iter() - .collect::>>()?; - output_files.extend(metas.into_iter().flatten()); + return Ok(()); } + self.picker_output = picker_output; - let inputs = compacted_inputs.into_iter().collect(); - Ok((output_files, inputs)) - } - - async fn handle_compaction(&mut self) -> error::Result<()> { self.mark_files_compacting(true); + let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) .start_timer(); - let (added, mut deleted) = match self.merge_ssts().await { + + let compaction_region = CompactionRegion { + region_id: self.region_id, + region_options: self.version_control.current().version.options.clone(), + engine_config: self.engine_config.clone(), + region_metadata: self.metadata.clone(), + manifest_ctx: self.manifest_ctx.clone(), + version_control: self.version_control.clone(), + access_layer: self.sst_layer.clone(), + cache_manager: self.cache_manager.clone(), + file_purger: self.file_purger.clone(), + }; + + let compaction_result = match self + .compactor + .merge_ssts(compaction_region.clone(), self.picker_output.clone()) + .await + { Ok(v) => v, Err(e) => { error!(e; "Failed to compact region: {}", self.region_id); @@ -229,14 +150,23 @@ impl CompactionTaskImpl { return Err(e); } }; - deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone())); + let merge_time = merge_timer.stop_and_record(); + + if compaction_result.is_empty() { + info!( + "No files to compact, region_id: {}, window: {:?}", + self.region_id, compaction_result.compaction_time_window + ); + return Ok(()); + } + info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", self.region_id, - deleted, - added, - self.compaction_time_window, + compaction_result.fileds_to_remove, + compaction_result.files_to_add, + compaction_result.compaction_time_window, self.waiters.len(), merge_time, ); @@ -246,25 +176,10 @@ impl CompactionTaskImpl { let _manifest_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["write_manifest"]) .start_timer(); - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: added, - files_to_remove: deleted, - compaction_time_window: self - .compaction_time_window - .map(|seconds| Duration::from_secs(seconds as u64)), - flushed_entry_id: None, - flushed_sequence: None, - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - // We might leak files if we fail to update manifest. We can add a cleanup task to - // remove them later. - self.manifest_ctx - .update_manifest(RegionState::Writable, action_list, || { - self.version_control - .apply_edit(edit, &[], self.file_purger.clone()); - }) - .await + self.compactor + .update_manifest(compaction_region.clone(), compaction_result) + .await?; + Ok(()) } /// Handles compaction failure, notifies all waiters. diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 7c6bf0827574..cfd6f763d1fa 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -22,9 +22,9 @@ use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; -use crate::compaction::picker::{CompactionTask, Picker}; -use crate::compaction::task::CompactionTaskImpl; -use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::{get_expired_ssts, CompactionOutput}; +use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; use crate::sst::version::LevelMeta; @@ -110,22 +110,7 @@ impl TwcsPicker { } impl Picker for TwcsPicker { - fn pick(&self, req: CompactionRequest) -> Option> { - let CompactionRequest { - engine_config, - current_version, - access_layer, - request_sender, - waiters, - file_purger, - start_time, - cache_manager, - manifest_ctx, - version_control, - listener, - .. - } = req; - + fn pick(&self, current_version: VersionRef) -> Option { let region_metadata = current_version.metadata.clone(); let region_id = region_metadata.region_id; @@ -159,33 +144,16 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. - for waiter in waiters { - waiter.send(Ok(0)); - } + // Nothing to compact, we are done. + // You can notifiy all waiters as we consume the compaction request. return None; } - let task = CompactionTaskImpl { - engine_config, - region_id, - metadata: region_metadata, - sst_layer: access_layer, + + Some(PickerOutput { outputs, expired_ssts, - compaction_time_window: Some(time_window_size), - request_sender, - waiters, - file_purger, - start_time, - cache_manager, - storage: current_version.options.storage.clone(), - index_options: current_version.options.index_options.clone(), - append_mode: current_version.options.append_mode, - manifest_ctx, - version_control, - listener, - }; - Some(Box::new(task)) + time_window_size, + }) } } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 2f0ff49c7f16..8cc42baa307d 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -23,9 +23,8 @@ use common_time::Timestamp; use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; -use crate::compaction::picker::{CompactionTask, Picker}; -use crate::compaction::task::CompactionTaskImpl; -use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; @@ -101,46 +100,18 @@ impl WindowedCompactionPicker { } impl Picker for WindowedCompactionPicker { - fn pick(&self, req: CompactionRequest) -> Option> { - let region_id = req.region_id(); - let CompactionRequest { - engine_config, - current_version, - access_layer, - request_sender, - waiters, - file_purger, - start_time, - cache_manager, - manifest_ctx, - version_control, - listener, - } = req; - - let (outputs, expired_ssts, time_window) = - self.pick_inner(region_id, ¤t_version, Timestamp::current_millis()); - - let task = CompactionTaskImpl { - engine_config: engine_config.clone(), - region_id, - metadata: current_version.metadata.clone().clone(), - sst_layer: access_layer.clone(), + fn pick(&self, current_version: VersionRef) -> Option { + let (outputs, expired_ssts, time_window) = self.pick_inner( + current_version.metadata.region_id, + ¤t_version, + Timestamp::current_millis(), + ); + + Some(PickerOutput { outputs, expired_ssts, - compaction_time_window: Some(time_window), - request_sender, - waiters, - file_purger, - start_time, - cache_manager, - storage: current_version.options.storage.clone(), - index_options: current_version.options.index_options.clone(), - append_mode: current_version.options.append_mode, - manifest_ctx, - version_control, - listener, - }; - Some(Box::new(task)) + time_window_size: time_window, + }) } } From a91c5d666f64fff8fb9cdf9d1adcbb4b7acfa95e Mon Sep 17 00:00:00 2001 From: zyy17 Date: Sun, 2 Jun 2024 23:19:27 +0800 Subject: [PATCH 02/31] chore: add compact() in Compactor trait and expose compaction module --- src/mito2/src/compaction/compactor.rs | 25 +++++++++++++++++++++---- src/mito2/src/lib.rs | 2 +- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 062004d90cb7..129c0f738d18 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -86,10 +86,10 @@ pub trait Compactor: Send + Sync + 'static { async fn update_manifest( &self, compaction_region: CompactionRegion, - compaction_result: MergeOutput, + merge_output: MergeOutput, ) -> Result<()> { let files_to_add = { - if let Some(files) = compaction_result.files_to_add { + if let Some(files) = merge_output.files_to_add { files } else { return Ok(()); @@ -97,7 +97,7 @@ pub trait Compactor: Send + Sync + 'static { }; let files_to_remove = { - if let Some(files) = compaction_result.fileds_to_remove { + if let Some(files) = merge_output.fileds_to_remove { files } else { return Ok(()); @@ -108,7 +108,7 @@ pub trait Compactor: Send + Sync + 'static { let edit = RegionEdit { files_to_add, files_to_remove, - compaction_time_window: compaction_result + compaction_time_window: merge_output .compaction_time_window .map(|seconds| Duration::from_secs(seconds as u64)), flushed_entry_id: None, @@ -132,6 +132,23 @@ pub trait Compactor: Send + Sync + 'static { } fn picker(&self) -> Arc; + + async fn compact(&self, compaction_region: CompactionRegion) -> Result<()> { + let picker_output = self + .picker() + .pick(compaction_region.version_control.current().version); + let merge_output = self + .merge_ssts(compaction_region.clone(), picker_output) + .await?; + if merge_output.is_empty() { + info!( + "No files to compact for region_id: {}", + compaction_region.region_id + ); + return Ok(()); + } + self.update_manifest(compaction_region, merge_output).await + } } async fn do_merge_ssts( diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index acf65608cef8..cdd2416940ce 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -25,7 +25,7 @@ pub mod test_util; mod access_layer; mod cache; -mod compaction; +pub mod compaction; pub mod config; pub mod engine; pub mod error; From 37b0464db229b779d2d69208072cf60744c050e5 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Sun, 2 Jun 2024 23:58:52 +0800 Subject: [PATCH 03/31] refactor: add CompactionRequest and open_compaction_region --- src/mito2/src/compaction/compactor.rs | 144 ++++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 129c0f738d18..c9ee6b3a60cc 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,30 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use api::v1::region::compact_request; use common_telemetry::info; +use object_store::manager::ObjectStoreManager; +use object_store::util::join_dir; use smallvec::SmallVec; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; +use store_api::path_utils::region_dir; use store_api::storage::RegionId; -use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; use crate::cache::CacheManagerRef; use crate::compaction::build_sst_reader; use crate::compaction::picker::{Picker, PickerOutput}; use crate::config::MitoConfig; -use crate::error; -use crate::error::Result; +use crate::error::{JoinSnafu, ObjectStoreNotFoundSnafu, Result}; +use crate::flush::WriteBufferManagerImpl; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::manifest::storage::manifest_compress_type; +use crate::memtable::time_partition::TimePartitions; +use crate::memtable::MemtableBuilderProvider; use crate::read::Source; use crate::region::options::RegionOptions; -use crate::region::version::VersionControlRef; +use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::ManifestContext; use crate::region::RegionState::Writable; +use crate::schedule::scheduler::LocalScheduler; use crate::sst::file::{FileMeta, IndexType}; -use crate::sst::file_purger::FilePurgerRef; +use crate::sst::file_purger::{FilePurgerRef, LocalFilePurger}; +use crate::sst::index::intermediate::IntermediateManager; use crate::sst::parquet::WriteOptions; #[derive(Clone)] @@ -52,6 +63,125 @@ pub struct CompactionRegion { pub file_purger: FilePurgerRef, } +pub struct CompactionRequest { + pub catalog: String, + pub schema: String, + pub region_id: RegionId, + pub options: HashMap, + pub compaction_options: compact_request::Options, +} + +pub async fn open_compaction_region( + req: CompactionRequest, + mito_config: Arc, + object_store_manager: ObjectStoreManager, + cache_manager: CacheManagerRef, +) -> Result { + let region_options = RegionOptions::try_from(&req.options)?; + let region_dir = region_dir( + format!("{}/{}", &req.catalog, &req.schema).as_str(), + req.region_id, + ); + + let object_store = { + let name = ®ion_options.storage; + if let Some(name) = name { + object_store_manager + .find(name) + .context(ObjectStoreNotFoundSnafu { + object_store: name.to_string(), + })? + } else { + object_store_manager.default_object_store() + } + }; + + let access_layer = { + // TODO(zyy17): Should we really need to create intermediate manager here? + let intermediate_manager = + IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) + .await?; + + Arc::new(AccessLayer::new( + region_dir.to_string(), + object_store.clone(), + intermediate_manager, + )) + }; + + let manifest_manager = { + let region_manifest_options = RegionManifestOptions { + // TODO(zyy17): It needs to add a new function instead of using join_dir. + manifest_dir: join_dir(®ion_dir, "manifest"), + object_store: object_store.clone(), + compress_type: manifest_compress_type(mito_config.compress_manifest), + checkpoint_distance: mito_config.manifest_checkpoint_distance, + }; + + // TODO(zyy17): handle error. + RegionManifestManager::open(region_manifest_options, Default::default()) + .await? + .unwrap() + }; + + let manifest = manifest_manager.manifest(); + let region_metadata = manifest.metadata.clone(); + let manifest_ctx = Arc::new(ManifestContext::new(manifest_manager, Writable)); + + let file_purger = { + let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs)); + Arc::new(LocalFilePurger::new( + purge_scheduler.clone(), + access_layer.clone(), + None, + )) + }; + + let version_control = { + let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( + mito_config.global_write_buffer_size.as_bytes() as usize, + )); + let memtable_builder_provider = + MemtableBuilderProvider::new(Some(write_buffer_manager.clone()), mito_config.clone()); + + let memtable_builder = memtable_builder_provider.builder_for_options( + region_options.memtable.as_ref(), + !region_options.append_mode, + ); + + // Initial memtable id is 0. + let part_duration = region_options.compaction.time_window(); + let mutable = Arc::new(TimePartitions::new( + region_metadata.clone(), + memtable_builder.clone(), + 0, + part_duration, + )); + + let version = VersionBuilder::new(region_metadata.clone(), mutable) + .add_files(file_purger.clone(), manifest.files.values().cloned()) + .flushed_entry_id(manifest.flushed_entry_id) + .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) + .options(region_options.clone()) + .build(); + Arc::new(VersionControl::new(version)) + }; + + Ok(CompactionRegion { + region_options: region_options.clone(), + manifest_ctx, + access_layer, + version_control, + region_id: req.region_id, + cache_manager: cache_manager.clone(), + engine_config: mito_config.clone(), + region_metadata: region_metadata.clone(), + file_purger: file_purger.clone(), + }) +} + /// MergeOutput represents the output of merging SST files. pub struct MergeOutput { pub files_to_add: Option>, @@ -256,7 +386,7 @@ async fn do_merge_ssts( } let metas = futures::future::try_join_all(task_chunk) .await - .context(error::JoinSnafu)? + .context(JoinSnafu)? .into_iter() .collect::>>()?; output_files.extend(metas.into_iter().flatten()); From 014bc22f4854b96940bc6ab5e35c250812706d85 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 3 Jun 2024 18:28:27 +0800 Subject: [PATCH 04/31] refactor: export the compaction api --- src/mito2/src/compaction.rs | 4 +-- src/mito2/src/compaction/compactor.rs | 43 +++++++++++++-------------- src/mito2/src/lib.rs | 6 +++- src/mito2/src/region/version.rs | 2 +- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index e118e05246c2..d038368182dd 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -14,7 +14,7 @@ mod buckets; pub mod compactor; -mod picker; +pub mod picker; mod task; #[cfg(test)] mod test_util; @@ -377,7 +377,7 @@ impl CompactionStatus { } #[derive(Debug, Clone)] -pub(crate) struct CompactionOutput { +pub struct CompactionOutput { pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index c9ee6b3a60cc..7eb898edc104 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -27,12 +27,11 @@ use store_api::path_utils::region_dir; use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; -use crate::cache::CacheManagerRef; +use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::build_sst_reader; use crate::compaction::picker::{Picker, PickerOutput}; use crate::config::MitoConfig; -use crate::error::{JoinSnafu, ObjectStoreNotFoundSnafu, Result}; -use crate::flush::WriteBufferManagerImpl; +use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; @@ -51,18 +50,21 @@ use crate::sst::parquet::WriteOptions; #[derive(Clone)] /// CompactionRegion represents a region that needs to be compacted. +/// It's the subset of MitoRegion. pub struct CompactionRegion { pub region_id: RegionId, pub region_options: RegionOptions, pub engine_config: Arc, pub region_metadata: RegionMetadataRef, - pub manifest_ctx: Arc, pub cache_manager: CacheManagerRef, pub access_layer: AccessLayerRef, - pub version_control: VersionControlRef, pub file_purger: FilePurgerRef, + + pub(crate) manifest_ctx: Arc, + pub(crate) version_control: VersionControlRef, } +/// CompactionRequest represents the request to compact a region. pub struct CompactionRequest { pub catalog: String, pub schema: String, @@ -71,11 +73,12 @@ pub struct CompactionRequest { pub compaction_options: compact_request::Options, } +/// Open a compaction region from a compaction request. +/// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( req: CompactionRequest, mito_config: Arc, object_store_manager: ObjectStoreManager, - cache_manager: CacheManagerRef, ) -> Result { let region_options = RegionOptions::try_from(&req.options)?; let region_dir = region_dir( @@ -97,7 +100,6 @@ pub async fn open_compaction_region( }; let access_layer = { - // TODO(zyy17): Should we really need to create intermediate manager here? let intermediate_manager = IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) .await?; @@ -111,17 +113,18 @@ pub async fn open_compaction_region( let manifest_manager = { let region_manifest_options = RegionManifestOptions { - // TODO(zyy17): It needs to add a new function instead of using join_dir. manifest_dir: join_dir(®ion_dir, "manifest"), object_store: object_store.clone(), compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, }; - // TODO(zyy17): handle error. RegionManifestManager::open(region_manifest_options, Default::default()) .await? - .unwrap() + .context(EmptyRegionDirSnafu { + region_id: req.region_id, + region_dir, + })? }; let manifest = manifest_manager.manifest(); @@ -138,24 +141,18 @@ pub async fn open_compaction_region( }; let version_control = { - let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( - mito_config.global_write_buffer_size.as_bytes() as usize, - )); - let memtable_builder_provider = - MemtableBuilderProvider::new(Some(write_buffer_manager.clone()), mito_config.clone()); - - let memtable_builder = memtable_builder_provider.builder_for_options( - region_options.memtable.as_ref(), - !region_options.append_mode, - ); + let memtable_builder = MemtableBuilderProvider::new(None, mito_config.clone()) + .builder_for_options( + region_options.memtable.as_ref(), + !region_options.append_mode, + ); // Initial memtable id is 0. - let part_duration = region_options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( region_metadata.clone(), memtable_builder.clone(), 0, - part_duration, + region_options.compaction.time_window(), )); let version = VersionBuilder::new(region_metadata.clone(), mutable) @@ -175,7 +172,7 @@ pub async fn open_compaction_region( access_layer, version_control, region_id: req.region_id, - cache_manager: cache_manager.clone(), + cache_manager: Arc::new(CacheManager::default()), engine_config: mito_config.clone(), region_metadata: region_metadata.clone(), file_purger: file_purger.clone(), diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index cdd2416940ce..c7a6e142cee1 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -25,7 +25,7 @@ pub mod test_util; mod access_layer; mod cache; -pub mod compaction; +mod compaction; pub mod config; pub mod engine; pub mod error; @@ -44,6 +44,10 @@ mod time_provider; pub mod wal; mod worker; +pub use compaction::compactor::*; +pub use compaction::picker::*; +pub use compaction::CompactionOutput; + #[cfg_attr(doc, aquamarine::aquamarine)] /// # Mito developer document /// diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index fa95255c1a5c..a0087adf1f8e 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -220,7 +220,7 @@ pub(crate) struct VersionControlData { /// Static metadata of a region. #[derive(Clone, Debug)] -pub(crate) struct Version { +pub struct Version { /// Metadata of the region. /// /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing From f9d1e66fdf41155dd94bc582a77ca4926daa1ce3 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 3 Jun 2024 18:39:54 +0800 Subject: [PATCH 05/31] refactor: add DefaultCompactor::new_from_request --- src/mito2/src/compaction/compactor.rs | 8 ++++++++ src/mito2/src/compaction/picker.rs | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 7eb898edc104..c73f3a5df7f5 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -37,6 +37,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; +use crate::new_picker; use crate::read::Source; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; @@ -419,4 +420,11 @@ impl DefaultCompactor { pub fn new_with_picker(picker: Arc) -> Self { Self { picker } } + + pub fn new_from_request(req: &CompactionRequest) -> Result { + let region_options = RegionOptions::try_from(&req.options)?; + let compaction_options = region_options.compaction; + let picker = new_picker(req.compaction_options.clone(), &compaction_options); + Ok(Self { picker }) + } } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 7501a56e694a..a47e4fde79cf 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -42,7 +42,7 @@ pub struct PickerOutput { pub time_window_size: i64, } -pub fn new_picker( +pub(crate) fn new_picker( compact_request_options: compact_request::Options, compaction_options: &CompactionOptions, ) -> Arc { From 3eb0dab1a2389147703ca7f9c1b71226f992773d Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 3 Jun 2024 18:53:33 +0800 Subject: [PATCH 06/31] refactor: no need to pass mito_config in open_compaction_region() --- src/mito2/src/compaction/compactor.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index c73f3a5df7f5..fe7c329511ac 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -78,7 +78,7 @@ pub struct CompactionRequest { /// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( req: CompactionRequest, - mito_config: Arc, + data_home: &str, object_store_manager: ObjectStoreManager, ) -> Result { let region_options = RegionOptions::try_from(&req.options)?; @@ -100,6 +100,9 @@ pub async fn open_compaction_region( } }; + let mut mito_config = MitoConfig::default(); + mito_config.sanitize(data_home)?; + let access_layer = { let intermediate_manager = IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) @@ -142,7 +145,7 @@ pub async fn open_compaction_region( }; let version_control = { - let memtable_builder = MemtableBuilderProvider::new(None, mito_config.clone()) + let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone())) .builder_for_options( region_options.memtable.as_ref(), !region_options.append_mode, @@ -174,7 +177,7 @@ pub async fn open_compaction_region( version_control, region_id: req.region_id, cache_manager: Arc::new(CacheManager::default()), - engine_config: mito_config.clone(), + engine_config: Arc::new(mito_config.clone()), region_metadata: region_metadata.clone(), file_purger: file_purger.clone(), }) From 8cd55a25753e045d7ab217820ee5eb943eb649fa Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 3 Jun 2024 19:21:36 +0800 Subject: [PATCH 07/31] refactor: CompactionRequest -> &CompactionRequest --- src/mito2/src/compaction/compactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index fe7c329511ac..b7b2f1db1b45 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -77,7 +77,7 @@ pub struct CompactionRequest { /// Open a compaction region from a compaction request. /// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( - req: CompactionRequest, + req: &CompactionRequest, data_home: &str, object_store_manager: ObjectStoreManager, ) -> Result { From 891426d314aa13266eff2746432d94be07a14f30 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 3 Jun 2024 20:07:56 +0800 Subject: [PATCH 08/31] fix: typo --- src/mito2/src/compaction/compactor.rs | 10 +++++----- src/mito2/src/compaction/task.rs | 2 +- src/mito2/src/compaction/twcs.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index b7b2f1db1b45..fbeb0a63218e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -186,13 +186,13 @@ pub async fn open_compaction_region( /// MergeOutput represents the output of merging SST files. pub struct MergeOutput { pub files_to_add: Option>, - pub fileds_to_remove: Option>, + pub files_to_remove: Option>, pub compaction_time_window: Option, } impl MergeOutput { pub fn is_empty(&self) -> bool { - self.files_to_add.is_none() && self.fileds_to_remove.is_none() + self.files_to_add.is_none() && self.files_to_remove.is_none() } } @@ -208,7 +208,7 @@ pub trait Compactor: Send + Sync + 'static { } else { Ok(MergeOutput { files_to_add: None, - fileds_to_remove: None, + files_to_remove: None, compaction_time_window: None, }) } @@ -228,7 +228,7 @@ pub trait Compactor: Send + Sync + 'static { }; let files_to_remove = { - if let Some(files) = merge_output.fileds_to_remove { + if let Some(files) = merge_output.files_to_remove { files } else { return Ok(()); @@ -403,7 +403,7 @@ async fn do_merge_ssts( Ok(MergeOutput { files_to_add: Some(output_files), - fileds_to_remove: Some(inputs), + files_to_remove: Some(inputs), compaction_time_window: Some(picker_output.time_window_size), }) } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 9f9676fe1329..35f6f8b75f06 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -164,7 +164,7 @@ impl CompactionTaskImpl { info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", self.region_id, - compaction_result.fileds_to_remove, + compaction_result.files_to_remove, compaction_result.files_to_add, compaction_result.compaction_time_window, self.waiters.len(), diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index cfd6f763d1fa..50ddb8b3bd50 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -145,7 +145,7 @@ impl Picker for TwcsPicker { if outputs.is_empty() && expired_ssts.is_empty() { // Nothing to compact, we are done. - // You can notifiy all waiters as we consume the compaction request. + // You can notify all waiters as we consume the compaction request. return None; } From 8ff9c6f2281dbbc8c3a6a129e242d6cf13be98b7 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 09:43:51 +0800 Subject: [PATCH 09/31] docs: add docs for public apis --- src/mito2/src/compaction/compactor.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index fbeb0a63218e..852e220f9c75 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -196,8 +196,10 @@ impl MergeOutput { } } +/// Compactor is the trait that defines the compaction logic. #[async_trait::async_trait] pub trait Compactor: Send + Sync + 'static { + /// Merge SST files for a region. async fn merge_ssts( &self, compaction_region: CompactionRegion, @@ -214,6 +216,7 @@ pub trait Compactor: Send + Sync + 'static { } } + /// Update the manifest after merging SST files. async fn update_manifest( &self, compaction_region: CompactionRegion, @@ -262,8 +265,7 @@ pub trait Compactor: Send + Sync + 'static { Ok(()) } - fn picker(&self) -> Arc; - + /// Execute compaction for a region. async fn compact(&self, compaction_region: CompactionRegion) -> Result<()> { let picker_output = self .picker() @@ -280,6 +282,9 @@ pub trait Compactor: Send + Sync + 'static { } self.update_manifest(compaction_region, merge_output).await } + + /// Get the picker of the compactor. + fn picker(&self) -> Arc; } async fn do_merge_ssts( @@ -408,6 +413,7 @@ async fn do_merge_ssts( }) } +/// DefaultCompactor is the default implementation of Compactor. pub struct DefaultCompactor { picker: Arc, } @@ -420,10 +426,12 @@ impl Compactor for DefaultCompactor { } impl DefaultCompactor { + /// Create a new DefaultCompactor with a picker. pub fn new_with_picker(picker: Arc) -> Self { Self { picker } } + /// Create a new DefaultCompactor from a compaction request. pub fn new_from_request(req: &CompactionRequest) -> Result { let region_options = RegionOptions::try_from(&req.options)?; let compaction_options = region_options.compaction; From acabc0726d3ca9cbcf1687e29a4ca5d9ff52b179 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 11:59:36 +0800 Subject: [PATCH 10/31] refactor: remove 'Picker' from Compactor --- src/mito2/src/compaction.rs | 32 ++++++++--- src/mito2/src/compaction/compactor.rs | 77 +++++++++++---------------- src/mito2/src/compaction/picker.rs | 2 +- src/mito2/src/compaction/task.rs | 57 ++++++-------------- 4 files changed, 74 insertions(+), 94 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d038368182dd..0d694881792b 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -48,6 +48,7 @@ use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, TimeRangePredicateOverflowSnafu, }; +use crate::metrics::COMPACTION_STAGE_ELAPSED; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; @@ -521,23 +522,42 @@ fn build_compaction_task( picker, region_id ); + let pick_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick"]) + .start_timer(); + let picker_output = picker.pick(version_control.current().version); + drop(pick_timer); + + let picker_output = { + if let Some(picker_output) = picker_output { + picker_output + } else { + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in waiters { + waiter.send(Ok(0)); + } + return None; + } + }; + let task = CompactionTaskImpl { - engine_config: engine_config.clone(), region_id, - metadata: current_version.metadata.clone().clone(), - sst_layer: access_layer.clone(), request_sender, waiters, file_purger, start_time, cache_manager, - append_mode: current_version.options.append_mode, manifest_ctx, version_control, listener, - compactor: Arc::new(DefaultCompactor::new_with_picker(picker)), - picker_output: None, + picker_output, + metadata: current_version.metadata.clone().clone(), + sst_layer: access_layer.clone(), + engine_config: engine_config.clone(), + append_mode: current_version.options.append_mode, + compactor: Arc::new(DefaultCompactor {}), }; + Some(Box::new(task)) } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 852e220f9c75..ef315263f013 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -29,7 +29,7 @@ use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::build_sst_reader; -use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::picker::{new_picker, PickerOutput}; use crate::config::MitoConfig; use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; @@ -37,7 +37,6 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; -use crate::new_picker; use crate::read::Source; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; @@ -49,23 +48,24 @@ use crate::sst::file_purger::{FilePurgerRef, LocalFilePurger}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::parquet::WriteOptions; -#[derive(Clone)] /// CompactionRegion represents a region that needs to be compacted. /// It's the subset of MitoRegion. +#[derive(Clone)] pub struct CompactionRegion { pub region_id: RegionId, pub region_options: RegionOptions, - pub engine_config: Arc, - pub region_metadata: RegionMetadataRef, - pub cache_manager: CacheManagerRef, - pub access_layer: AccessLayerRef, - pub file_purger: FilePurgerRef, + pub(crate) engine_config: Arc, + pub(crate) region_metadata: RegionMetadataRef, + pub(crate) cache_manager: CacheManagerRef, + pub(crate) access_layer: AccessLayerRef, + pub(crate) file_purger: FilePurgerRef, pub(crate) manifest_ctx: Arc, pub(crate) version_control: VersionControlRef, } /// CompactionRequest represents the request to compact a region. +#[derive(Debug, Clone)] pub struct CompactionRequest { pub catalog: String, pub schema: String, @@ -203,17 +203,9 @@ pub trait Compactor: Send + Sync + 'static { async fn merge_ssts( &self, compaction_region: CompactionRegion, - picker_output: Option, + picker_output: PickerOutput, ) -> Result { - if let Some(picker_output) = picker_output { - do_merge_ssts(compaction_region, picker_output).await - } else { - Ok(MergeOutput { - files_to_add: None, - files_to_remove: None, - compaction_time_window: None, - }) - } + do_merge_ssts(compaction_region, picker_output).await } /// Update the manifest after merging SST files. @@ -266,10 +258,25 @@ pub trait Compactor: Send + Sync + 'static { } /// Execute compaction for a region. - async fn compact(&self, compaction_region: CompactionRegion) -> Result<()> { - let picker_output = self - .picker() + async fn compact( + &self, + compaction_region: CompactionRegion, + compact_request_options: compact_request::Options, + ) -> Result<()> { + let picker_output = { + let picker_output = new_picker( + compact_request_options, + &compaction_region.region_options.compaction, + ) .pick(compaction_region.version_control.current().version); + + if let Some(picker_output) = picker_output { + picker_output + } else { + return Ok(()); + } + }; + let merge_output = self .merge_ssts(compaction_region.clone(), picker_output) .await?; @@ -282,9 +289,6 @@ pub trait Compactor: Send + Sync + 'static { } self.update_manifest(compaction_region, merge_output).await } - - /// Get the picker of the compactor. - fn picker(&self) -> Arc; } async fn do_merge_ssts( @@ -414,28 +418,7 @@ async fn do_merge_ssts( } /// DefaultCompactor is the default implementation of Compactor. -pub struct DefaultCompactor { - picker: Arc, -} +pub struct DefaultCompactor; #[async_trait::async_trait] -impl Compactor for DefaultCompactor { - fn picker(&self) -> Arc { - self.picker.clone() - } -} - -impl DefaultCompactor { - /// Create a new DefaultCompactor with a picker. - pub fn new_with_picker(picker: Arc) -> Self { - Self { picker } - } - - /// Create a new DefaultCompactor from a compaction request. - pub fn new_from_request(req: &CompactionRequest) -> Result { - let region_options = RegionOptions::try_from(&req.options)?; - let compaction_options = region_options.compaction; - let picker = new_picker(req.compaction_options.clone(), &compaction_options); - Ok(Self { picker }) - } -} +impl Compactor for DefaultCompactor {} diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index a47e4fde79cf..a60c582c943d 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -35,7 +35,7 @@ pub trait Picker: Debug + Send + Sync + 'static { fn pick(&self, current_version: VersionRef) -> Option; } -#[derive(Clone)] +#[derive(Default, Clone, Debug)] pub struct PickerOutput { pub outputs: Vec, pub expired_ssts: Vec, diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 35f6f8b75f06..08ad938ccf3e 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -61,27 +61,24 @@ pub(crate) struct CompactionTaskImpl { pub(crate) version_control: VersionControlRef, /// Event listener. pub(crate) listener: WorkerListener, - + /// Compactor to handle compaction. pub(crate) compactor: Arc, - pub(crate) picker_output: Option, + /// Output of the picker. + pub(crate) picker_output: PickerOutput, } impl Debug for CompactionTaskImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(picker_output) = &self.picker_output { - f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) - .field("outputs", &picker_output.outputs) - .field("expired_ssts", &picker_output.expired_ssts) - .field("compaction_time_window", &picker_output.time_window_size) - .field("append_mode", &self.append_mode) - .finish() - } else { - f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) - .field("append_mode", &self.append_mode) - .finish() - } + f.debug_struct("TwcsCompactionTask") + .field("region_id", &self.region_id) + .field("outputs", &self.picker_output.outputs) + .field("expired_ssts", &self.picker_output.expired_ssts) + .field( + "compaction_time_window", + &self.picker_output.time_window_size, + ) + .field("append_mode", &self.append_mode) + .finish() } } @@ -93,33 +90,13 @@ impl Drop for CompactionTaskImpl { impl CompactionTaskImpl { fn mark_files_compacting(&self, compacting: bool) { - if let Some(picker_output) = &self.picker_output { - picker_output - .outputs - .iter() - .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); - } + self.picker_output + .outputs + .iter() + .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); } async fn handle_compaction(&mut self) -> error::Result<()> { - let pick_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick"]) - .start_timer(); - let picker_output = self - .compactor - .picker() - .pick(self.version_control.current().version); - drop(pick_timer); - - if picker_output.is_none() { - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. - for waiter in self.waiters.drain(..) { - waiter.send(Ok(0)); - } - return Ok(()); - } - self.picker_output = picker_output; - self.mark_files_compacting(true); let merge_timer = COMPACTION_STAGE_ELAPSED From de8496c17015e3be2d7e2a17365f7ccd94d2dcfe Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 12:17:45 +0800 Subject: [PATCH 11/31] chore: add logs --- src/mito2/src/compaction/compactor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ef315263f013..bd364b5ddb3b 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -273,6 +273,10 @@ pub trait Compactor: Send + Sync + 'static { if let Some(picker_output) = picker_output { picker_output } else { + info!( + "No files to compact for region_id: {}", + compaction_region.region_id + ); return Ok(()); } }; From 6bee93e741cd0618a03df34b31a4557008ba489d Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 12:24:55 +0800 Subject: [PATCH 12/31] chore: change pub attribute for Picker --- src/mito2/src/lib.rs | 1 - src/mito2/src/region/version.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index c7a6e142cee1..34de124f5243 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -45,7 +45,6 @@ pub mod wal; mod worker; pub use compaction::compactor::*; -pub use compaction::picker::*; pub use compaction::CompactionOutput; #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index a0087adf1f8e..fa95255c1a5c 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -220,7 +220,7 @@ pub(crate) struct VersionControlData { /// Static metadata of a region. #[derive(Clone, Debug)] -pub struct Version { +pub(crate) struct Version { /// Metadata of the region. /// /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing From a63c9f5f3a31a0c1e8d4eeed5ce92cb2e79da7f7 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 15:17:14 +0800 Subject: [PATCH 13/31] refactor: remove do_merge_ssts() --- src/mito2/src/compaction/compactor.rs | 250 +++++++++++++------------- 1 file changed, 122 insertions(+), 128 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index bd364b5ddb3b..2472d9cd5a8c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -203,9 +203,129 @@ pub trait Compactor: Send + Sync + 'static { async fn merge_ssts( &self, compaction_region: CompactionRegion, - picker_output: PickerOutput, + mut picker_output: PickerOutput, ) -> Result { - do_merge_ssts(compaction_region, picker_output).await + let current_version = compaction_region.version_control.current().version; + let mut futs = Vec::with_capacity(picker_output.outputs.len()); + let mut compacted_inputs = + Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + for output in picker_output.outputs.drain(..) { + compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); + + info!( + "Compaction region {} output [{}]-> {}", + compaction_region.region_id, + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .collect::>() + .join(","), + output.output_file_id + ); + + let write_opts = WriteOptions { + write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + ..Default::default() + }; + let create_inverted_index = compaction_region + .engine_config + .inverted_index + .create_on_compaction + .auto(); + let mem_threshold_index_create = compaction_region + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); + let index_write_buffer_size = Some( + compaction_region + .engine_config + .inverted_index + .write_buffer_size + .as_bytes() as usize, + ); + + let region_metadata = compaction_region.region_metadata.clone(); + let sst_layer = compaction_region.access_layer.clone(); + let region_id = compaction_region.region_id; + let file_id = output.output_file_id; + let cache_manager = compaction_region.cache_manager.clone(); + let storage = compaction_region.region_options.storage.clone(); + let index_options = current_version.options.index_options.clone(); + let append_mode = current_version.options.append_mode; + futs.push(async move { + let reader = build_sst_reader( + region_metadata.clone(), + sst_layer.clone(), + Some(cache_manager.clone()), + &output.inputs, + append_mode, + output.filter_deleted, + output.output_time_range, + ) + .await?; + let file_meta_opt = sst_layer + .write_sst( + SstWriteRequest { + file_id, + metadata: region_metadata, + source: Source::Reader(reader), + cache_manager, + storage, + create_inverted_index, + mem_threshold_index_create, + index_write_buffer_size, + index_options, + }, + &write_opts, + ) + .await? + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, + }); + Ok(file_meta_opt) + }); + } + let mut output_files = Vec::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = + Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); + for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(common_runtime::spawn_bg(task)); + } + } + let metas = futures::future::try_join_all(task_chunk) + .await + .context(JoinSnafu)? + .into_iter() + .collect::>>()?; + output_files.extend(metas.into_iter().flatten()); + } + + let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); + inputs.extend( + picker_output + .expired_ssts + .iter() + .map(|f| f.meta_ref().clone()), + ); + + Ok(MergeOutput { + files_to_add: Some(output_files), + files_to_remove: Some(inputs), + compaction_time_window: Some(picker_output.time_window_size), + }) } /// Update the manifest after merging SST files. @@ -295,132 +415,6 @@ pub trait Compactor: Send + Sync + 'static { } } -async fn do_merge_ssts( - compaction_region: CompactionRegion, - mut picker_output: PickerOutput, -) -> Result { - let current_version = compaction_region.version_control.current().version; - let mut futs = Vec::with_capacity(picker_output.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); - for output in picker_output.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); - - info!( - "Compaction region {} output [{}]-> {}", - compaction_region.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - output.output_file_id - ); - - let write_opts = WriteOptions { - write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, - ..Default::default() - }; - let create_inverted_index = compaction_region - .engine_config - .inverted_index - .create_on_compaction - .auto(); - let mem_threshold_index_create = compaction_region - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - compaction_region - .engine_config - .inverted_index - .write_buffer_size - .as_bytes() as usize, - ); - - let region_metadata = compaction_region.region_metadata.clone(); - let sst_layer = compaction_region.access_layer.clone(); - let region_id = compaction_region.region_id; - let file_id = output.output_file_id; - let cache_manager = compaction_region.cache_manager.clone(); - let storage = compaction_region.region_options.storage.clone(); - let index_options = current_version.options.index_options.clone(); - let append_mode = current_version.options.append_mode; - futs.push(async move { - let reader = build_sst_reader( - region_metadata.clone(), - sst_layer.clone(), - Some(cache_manager.clone()), - &output.inputs, - append_mode, - output.filter_deleted, - output.output_time_range, - ) - .await?; - let file_meta_opt = sst_layer - .write_sst( - SstWriteRequest { - file_id, - metadata: region_metadata, - source: Source::Reader(reader), - cache_manager, - storage, - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, - index_options, - }, - &write_opts, - ) - .await? - .map(|sst_info| FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, - }); - Ok(file_meta_opt) - }); - } - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); - for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_bg(task)); - } - } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(JoinSnafu)? - .into_iter() - .collect::>>()?; - output_files.extend(metas.into_iter().flatten()); - } - - let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); - inputs.extend( - picker_output - .expired_ssts - .iter() - .map(|f| f.meta_ref().clone()), - ); - - Ok(MergeOutput { - files_to_add: Some(output_files), - files_to_remove: Some(inputs), - compaction_time_window: Some(picker_output.time_window_size), - }) -} - /// DefaultCompactor is the default implementation of Compactor. pub struct DefaultCompactor; From 3a360c0879c31e224ffc91ddda2a32d0a940ad56 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 4 Jun 2024 16:12:25 +0800 Subject: [PATCH 14/31] refactor: update comments --- src/mito2/src/compaction/picker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index a60c582c943d..8652fbeb2c64 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -29,7 +29,7 @@ pub trait CompactionTask: Debug + Send + Sync + 'static { async fn run(&mut self); } -/// Picker picks input SST files and builds the compaction task. +/// Picker picks input SST files for compaction. /// Different compaction strategy may implement different pickers. pub trait Picker: Debug + Send + Sync + 'static { fn pick(&self, current_version: VersionRef) -> Option; From 35feb02dc6093af347c4135386bdf224cf152700 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Wed, 5 Jun 2024 16:54:01 +0800 Subject: [PATCH 15/31] refactor: use CompactionRegion argument in Picker --- src/mito2/src/compaction.rs | 25 ++++++---- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/compaction/picker.rs | 6 +-- src/mito2/src/compaction/task.rs | 66 ++++++++------------------- src/mito2/src/compaction/twcs.rs | 5 +- src/mito2/src/compaction/window.rs | 4 +- src/mito2/src/lib.rs | 1 + 7 files changed, 46 insertions(+), 63 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0d694881792b..4ed0151a6392 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -61,6 +61,7 @@ use crate::sst::file::{FileHandle, FileId, Level}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; +use crate::CompactionRegion; /// Region compaction request. pub struct CompactionRequest { @@ -522,10 +523,22 @@ fn build_compaction_task( picker, region_id ); + let compaction_region = CompactionRegion { + region_id, + region_options: current_version.options.clone(), + engine_config: engine_config.clone(), + region_metadata: current_version.metadata.clone(), + cache_manager: cache_manager.clone(), + access_layer: access_layer.clone(), + file_purger: file_purger.clone(), + manifest_ctx: manifest_ctx.clone(), + version_control: version_control.clone(), + }; + let pick_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["pick"]) .start_timer(); - let picker_output = picker.pick(version_control.current().version); + let picker_output = picker.pick(compaction_region.clone()); drop(pick_timer); let picker_output = { @@ -541,20 +554,12 @@ fn build_compaction_task( }; let task = CompactionTaskImpl { - region_id, request_sender, waiters, - file_purger, start_time, - cache_manager, - manifest_ctx, - version_control, listener, picker_output, - metadata: current_version.metadata.clone().clone(), - sst_layer: access_layer.clone(), - engine_config: engine_config.clone(), - append_mode: current_version.options.append_mode, + compaction_region, compactor: Arc::new(DefaultCompactor {}), }; diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 2472d9cd5a8c..bf27ad9e07a2 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -388,7 +388,7 @@ pub trait Compactor: Send + Sync + 'static { compact_request_options, &compaction_region.region_options.compaction, ) - .pick(compaction_region.version_control.current().version); + .pick(compaction_region.clone()); if let Some(picker_output) = picker_output { picker_output diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 8652fbeb2c64..c65bac476fd2 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -21,8 +21,8 @@ use crate::compaction::twcs::TwcsPicker; use crate::compaction::window::WindowedCompactionPicker; use crate::compaction::CompactionOutput; use crate::region::options::CompactionOptions; -use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +use crate::CompactionRegion; #[async_trait::async_trait] pub trait CompactionTask: Debug + Send + Sync + 'static { @@ -32,7 +32,7 @@ pub trait CompactionTask: Debug + Send + Sync + 'static { /// Picker picks input SST files for compaction. /// Different compaction strategy may implement different pickers. pub trait Picker: Debug + Send + Sync + 'static { - fn pick(&self, current_version: VersionRef) -> Option; + fn pick(&self, compaction_region: CompactionRegion) -> Option; } #[derive(Default, Clone, Debug)] @@ -42,7 +42,7 @@ pub struct PickerOutput { pub time_window_size: i64, } -pub(crate) fn new_picker( +pub fn new_picker( compact_request_options: compact_request::Options, compaction_options: &CompactionOptions, ) -> Arc { diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 08ad938ccf3e..60cbfe092d73 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -18,47 +18,28 @@ use std::time::Instant; use common_telemetry::{error, info}; use snafu::ResultExt; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; use tokio::sync::mpsc; -use crate::access_layer::AccessLayerRef; -use crate::cache::CacheManagerRef; use crate::compaction::compactor::{CompactionRegion, Compactor}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::config::MitoConfig; use crate::error; use crate::error::CompactRegionSnafu; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; -use crate::region::version::VersionControlRef; -use crate::region::ManifestContextRef; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; -use crate::sst::file_purger::FilePurgerRef; use crate::worker::WorkerListener; pub const MAX_PARALLEL_COMPACTION: usize = 8; pub(crate) struct CompactionTaskImpl { - pub engine_config: Arc, - pub region_id: RegionId, - pub metadata: RegionMetadataRef, - pub sst_layer: AccessLayerRef, - pub file_purger: FilePurgerRef, + pub compaction_region: CompactionRegion, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, /// Senders that are used to notify waiters waiting for pending compaction tasks. pub waiters: Vec, /// Start time of compaction task pub start_time: Instant, - pub(crate) cache_manager: CacheManagerRef, - /// The region is using append mode. - pub(crate) append_mode: bool, - /// Manifest context. - pub(crate) manifest_ctx: ManifestContextRef, - /// Version control to update. - pub(crate) version_control: VersionControlRef, /// Event listener. pub(crate) listener: WorkerListener, /// Compactor to handle compaction. @@ -70,14 +51,17 @@ pub(crate) struct CompactionTaskImpl { impl Debug for CompactionTaskImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) + .field("region_id", &self.compaction_region.region_id) .field("outputs", &self.picker_output.outputs) .field("expired_ssts", &self.picker_output.expired_ssts) .field( "compaction_time_window", &self.picker_output.time_window_size, ) - .field("append_mode", &self.append_mode) + .field( + "append_mode", + &self.compaction_region.region_options.append_mode, + ) .finish() } } @@ -103,26 +87,14 @@ impl CompactionTaskImpl { .with_label_values(&["merge"]) .start_timer(); - let compaction_region = CompactionRegion { - region_id: self.region_id, - region_options: self.version_control.current().version.options.clone(), - engine_config: self.engine_config.clone(), - region_metadata: self.metadata.clone(), - manifest_ctx: self.manifest_ctx.clone(), - version_control: self.version_control.clone(), - access_layer: self.sst_layer.clone(), - cache_manager: self.cache_manager.clone(), - file_purger: self.file_purger.clone(), - }; - let compaction_result = match self .compactor - .merge_ssts(compaction_region.clone(), self.picker_output.clone()) + .merge_ssts(self.compaction_region.clone(), self.picker_output.clone()) .await { Ok(v) => v, Err(e) => { - error!(e; "Failed to compact region: {}", self.region_id); + error!(e; "Failed to compact region: {}", self.compaction_region.region_id); merge_timer.stop_and_discard(); return Err(e); } @@ -133,14 +105,14 @@ impl CompactionTaskImpl { if compaction_result.is_empty() { info!( "No files to compact, region_id: {}, window: {:?}", - self.region_id, compaction_result.compaction_time_window + self.compaction_region.region_id, compaction_result.compaction_time_window ); return Ok(()); } info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", - self.region_id, + self.compaction_region.region_id, compaction_result.files_to_remove, compaction_result.files_to_add, compaction_result.compaction_time_window, @@ -148,13 +120,15 @@ impl CompactionTaskImpl { merge_time, ); - self.listener.on_merge_ssts_finished(self.region_id).await; + self.listener + .on_merge_ssts_finished(self.compaction_region.region_id) + .await; let _manifest_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["write_manifest"]) .start_timer(); self.compactor - .update_manifest(compaction_region.clone(), compaction_result) + .update_manifest(self.compaction_region.clone(), compaction_result) .await?; Ok(()) } @@ -164,7 +138,7 @@ impl CompactionTaskImpl { COMPACTION_FAILURE_COUNT.inc(); for waiter in self.waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { - region_id: self.region_id, + region_id: self.compaction_region.region_id, })); } } @@ -174,7 +148,7 @@ impl CompactionTaskImpl { if let Err(e) = self.request_sender.send(request).await { error!( "Failed to notify compaction job status for region {}, request: {:?}", - self.region_id, e.0 + self.compaction_region.region_id, e.0 ); } } @@ -185,24 +159,24 @@ impl CompactionTask for CompactionTaskImpl { async fn run(&mut self) { let notify = match self.handle_compaction().await { Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished { - region_id: self.region_id, + region_id: self.compaction_region.region_id, senders: std::mem::take(&mut self.waiters), start_time: self.start_time, }), Err(e) => { - error!(e; "Failed to compact region, region id: {}", self.region_id); + error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id); let err = Arc::new(e); // notify compaction waiters self.on_failure(err.clone()); BackgroundNotify::CompactionFailed(CompactionFailed { - region_id: self.region_id, + region_id: self.compaction_region.region_id, err, }) } }; self.send_to_worker(WorkerRequest::Background { - region_id: self.region_id, + region_id: self.compaction_region.region_id, notify, }) .await; diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 50ddb8b3bd50..0ba8a85729be 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,9 +24,9 @@ use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; use crate::sst::version::LevelMeta; +use crate::CompactionRegion; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. @@ -110,7 +110,8 @@ impl TwcsPicker { } impl Picker for TwcsPicker { - fn pick(&self, current_version: VersionRef) -> Option { + fn pick(&self, compaction_region: CompactionRegion) -> Option { + let current_version = compaction_region.version_control.current().version; let region_metadata = current_version.metadata.clone(); let region_id = region_metadata.region_id; diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 8cc42baa307d..ab7ecdd8892d 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -27,6 +27,7 @@ use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; +use crate::CompactionRegion; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -100,7 +101,8 @@ impl WindowedCompactionPicker { } impl Picker for WindowedCompactionPicker { - fn pick(&self, current_version: VersionRef) -> Option { + fn pick(&self, compaction_region: CompactionRegion) -> Option { + let current_version = compaction_region.version_control.current().version; let (outputs, expired_ssts, time_window) = self.pick_inner( current_version.metadata.region_id, ¤t_version, diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 34de124f5243..6d9a2cc302fc 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -45,6 +45,7 @@ pub mod wal; mod worker; pub use compaction::compactor::*; +pub use compaction::picker::{new_picker, Picker, PickerOutput}; pub use compaction::CompactionOutput; #[cfg_attr(doc, aquamarine::aquamarine)] From 55033f99847ddf8fbc259a890b44b174e4b37a89 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 15:12:19 +0800 Subject: [PATCH 16/31] chore: make compaction module public and remove unnessary clone --- src/mito2/src/compaction.rs | 5 ++--- src/mito2/src/compaction/compactor.rs | 12 +++++------- src/mito2/src/compaction/picker.rs | 4 ++-- src/mito2/src/compaction/task.rs | 4 ++-- src/mito2/src/compaction/twcs.rs | 4 ++-- src/mito2/src/compaction/window.rs | 4 ++-- src/mito2/src/lib.rs | 6 +----- 7 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 4ed0151a6392..94755dde8a58 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -40,7 +40,7 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::compaction::compactor::DefaultCompactor; +use crate::compaction::compactor::{CompactionRegion, DefaultCompactor}; use crate::compaction::picker::{new_picker, CompactionTask}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; @@ -61,7 +61,6 @@ use crate::sst::file::{FileHandle, FileId, Level}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; -use crate::CompactionRegion; /// Region compaction request. pub struct CompactionRequest { @@ -538,7 +537,7 @@ fn build_compaction_task( let pick_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["pick"]) .start_timer(); - let picker_output = picker.pick(compaction_region.clone()); + let picker_output = picker.pick(&compaction_region); drop(pick_timer); let picker_output = { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index bf27ad9e07a2..ab3d86e56b3b 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -202,7 +202,7 @@ pub trait Compactor: Send + Sync + 'static { /// Merge SST files for a region. async fn merge_ssts( &self, - compaction_region: CompactionRegion, + compaction_region: &CompactionRegion, mut picker_output: PickerOutput, ) -> Result { let current_version = compaction_region.version_control.current().version; @@ -331,7 +331,7 @@ pub trait Compactor: Send + Sync + 'static { /// Update the manifest after merging SST files. async fn update_manifest( &self, - compaction_region: CompactionRegion, + compaction_region: &CompactionRegion, merge_output: MergeOutput, ) -> Result<()> { let files_to_add = { @@ -380,7 +380,7 @@ pub trait Compactor: Send + Sync + 'static { /// Execute compaction for a region. async fn compact( &self, - compaction_region: CompactionRegion, + compaction_region: &CompactionRegion, compact_request_options: compact_request::Options, ) -> Result<()> { let picker_output = { @@ -388,7 +388,7 @@ pub trait Compactor: Send + Sync + 'static { compact_request_options, &compaction_region.region_options.compaction, ) - .pick(compaction_region.clone()); + .pick(&compaction_region); if let Some(picker_output) = picker_output { picker_output @@ -401,9 +401,7 @@ pub trait Compactor: Send + Sync + 'static { } }; - let merge_output = self - .merge_ssts(compaction_region.clone(), picker_output) - .await?; + let merge_output = self.merge_ssts(&compaction_region, picker_output).await?; if merge_output.is_empty() { info!( "No files to compact for region_id: {}", diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index c65bac476fd2..ba439bdf92ea 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -17,12 +17,12 @@ use std::sync::Arc; use api::v1::region::compact_request; +use crate::compaction::compactor::CompactionRegion; use crate::compaction::twcs::TwcsPicker; use crate::compaction::window::WindowedCompactionPicker; use crate::compaction::CompactionOutput; use crate::region::options::CompactionOptions; use crate::sst::file::FileHandle; -use crate::CompactionRegion; #[async_trait::async_trait] pub trait CompactionTask: Debug + Send + Sync + 'static { @@ -32,7 +32,7 @@ pub trait CompactionTask: Debug + Send + Sync + 'static { /// Picker picks input SST files for compaction. /// Different compaction strategy may implement different pickers. pub trait Picker: Debug + Send + Sync + 'static { - fn pick(&self, compaction_region: CompactionRegion) -> Option; + fn pick(&self, compaction_region: &CompactionRegion) -> Option; } #[derive(Default, Clone, Debug)] diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 60cbfe092d73..e9394f90e2c5 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -89,7 +89,7 @@ impl CompactionTaskImpl { let compaction_result = match self .compactor - .merge_ssts(self.compaction_region.clone(), self.picker_output.clone()) + .merge_ssts(&self.compaction_region, self.picker_output.clone()) .await { Ok(v) => v, @@ -128,7 +128,7 @@ impl CompactionTaskImpl { .with_label_values(&["write_manifest"]) .start_timer(); self.compactor - .update_manifest(self.compaction_region.clone(), compaction_result) + .update_manifest(&self.compaction_region, compaction_result) .await?; Ok(()) } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 0ba8a85729be..3bfe56015192 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -22,11 +22,11 @@ use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; +use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::sst::file::{FileHandle, FileId}; use crate::sst::version::LevelMeta; -use crate::CompactionRegion; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. @@ -110,7 +110,7 @@ impl TwcsPicker { } impl Picker for TwcsPicker { - fn pick(&self, compaction_region: CompactionRegion) -> Option { + fn pick(&self, compaction_region: &CompactionRegion) -> Option { let current_version = compaction_region.version_control.current().version; let region_metadata = current_version.metadata.clone(); let region_id = region_metadata.region_id; diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index ab7ecdd8892d..3d4086a25ce9 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -23,11 +23,11 @@ use common_time::Timestamp; use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; +use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; -use crate::CompactionRegion; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -101,7 +101,7 @@ impl WindowedCompactionPicker { } impl Picker for WindowedCompactionPicker { - fn pick(&self, compaction_region: CompactionRegion) -> Option { + fn pick(&self, compaction_region: &CompactionRegion) -> Option { let current_version = compaction_region.version_control.current().version; let (outputs, expired_ssts, time_window) = self.pick_inner( current_version.metadata.region_id, diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 6d9a2cc302fc..cdd2416940ce 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -25,7 +25,7 @@ pub mod test_util; mod access_layer; mod cache; -mod compaction; +pub mod compaction; pub mod config; pub mod engine; pub mod error; @@ -44,10 +44,6 @@ mod time_provider; pub mod wal; mod worker; -pub use compaction::compactor::*; -pub use compaction::picker::{new_picker, Picker, PickerOutput}; -pub use compaction::CompactionOutput; - #[cfg_attr(doc, aquamarine::aquamarine)] /// # Mito developer document /// From 15c102c43b9a90c9a721cbfea8aa3478961e6d46 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 15:23:27 +0800 Subject: [PATCH 17/31] refactor: move build_compaction_task() in CompactionScheduler{} --- src/mito2/src/compaction.rs | 135 ++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 68 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 94755dde8a58..46cb11ccfffe 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -230,7 +230,7 @@ impl CompactionScheduler { options: compact_request::Options, ) -> Result<()> { let region_id = request.region_id(); - let Some(mut task) = build_compaction_task(request, options) else { + let Some(mut task) = self.build_compaction_task(request, options) else { // Nothing to compact, remove it from the region status map. self.region_status.remove(®ion_id); return Ok(()); @@ -258,6 +258,72 @@ impl CompactionScheduler { // Notifies all pending tasks. status.on_failure(err); } + + fn build_compaction_task( + &self, + req: CompactionRequest, + options: compact_request::Options, + ) -> Option> { + let picker = new_picker(options, &req.current_version.options.compaction); + let region_id = req.region_id(); + let CompactionRequest { + engine_config, + current_version, + access_layer, + request_sender, + waiters, + file_purger, + start_time, + cache_manager, + manifest_ctx, + version_control, + listener, + } = req; + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + + let compaction_region = CompactionRegion { + region_id, + region_options: current_version.options.clone(), + engine_config: engine_config.clone(), + region_metadata: current_version.metadata.clone(), + cache_manager: cache_manager.clone(), + access_layer: access_layer.clone(), + file_purger: file_purger.clone(), + manifest_ctx: manifest_ctx.clone(), + version_control: version_control.clone(), + }; + + let pick_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick"]) + .start_timer(); + let picker_output = picker.pick(&compaction_region); + drop(pick_timer); + + let picker_output = if let Some(picker_output) = picker_output { + picker_output + } else { + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in waiters { + waiter.send(Ok(0)); + } + return None; + }; + + let task = CompactionTaskImpl { + request_sender, + waiters, + start_time, + listener, + picker_output, + compaction_region, + compactor: Arc::new(DefaultCompactor {}), + }; + + Some(Box::new(task)) + } } impl Drop for CompactionScheduler { @@ -498,73 +564,6 @@ fn get_expired_ssts( .collect() } -fn build_compaction_task( - req: CompactionRequest, - options: compact_request::Options, -) -> Option> { - let picker = new_picker(options, &req.current_version.options.compaction); - let region_id = req.region_id(); - let CompactionRequest { - engine_config, - current_version, - access_layer, - request_sender, - waiters, - file_purger, - start_time, - cache_manager, - manifest_ctx, - version_control, - listener, - } = req; - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, region_id - ); - - let compaction_region = CompactionRegion { - region_id, - region_options: current_version.options.clone(), - engine_config: engine_config.clone(), - region_metadata: current_version.metadata.clone(), - cache_manager: cache_manager.clone(), - access_layer: access_layer.clone(), - file_purger: file_purger.clone(), - manifest_ctx: manifest_ctx.clone(), - version_control: version_control.clone(), - }; - - let pick_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick"]) - .start_timer(); - let picker_output = picker.pick(&compaction_region); - drop(pick_timer); - - let picker_output = { - if let Some(picker_output) = picker_output { - picker_output - } else { - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. - for waiter in waiters { - waiter.send(Ok(0)); - } - return None; - } - }; - - let task = CompactionTaskImpl { - request_sender, - waiters, - start_time, - listener, - picker_output, - compaction_region, - compactor: Arc::new(DefaultCompactor {}), - }; - - Some(Box::new(task)) -} - #[cfg(test)] mod tests { use std::sync::Mutex; From 8ad6fff5667c1ffa768cfe17a6daf57436a3b023 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 15:55:03 +0800 Subject: [PATCH 18/31] chore: use in open_compaction_region() and add some comments for public structure --- src/mito2/src/compaction/compactor.rs | 10 +++------- src/mito2/src/compaction/picker.rs | 5 ++++- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ab3d86e56b3b..4122075dd887 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -78,7 +78,7 @@ pub struct CompactionRequest { /// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( req: &CompactionRequest, - data_home: &str, + mito_config: &MitoConfig, object_store_manager: ObjectStoreManager, ) -> Result { let region_options = RegionOptions::try_from(&req.options)?; @@ -100,9 +100,6 @@ pub async fn open_compaction_region( } }; - let mut mito_config = MitoConfig::default(); - mito_config.sanitize(data_home)?; - let access_layer = { let intermediate_manager = IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) @@ -183,7 +180,7 @@ pub async fn open_compaction_region( }) } -/// MergeOutput represents the output of merging SST files. +/// `[MergeOutput]` represents the output of merging SST files. pub struct MergeOutput { pub files_to_add: Option>, pub files_to_remove: Option>, @@ -362,8 +359,7 @@ pub trait Compactor: Send + Sync + 'static { }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - // We might leak files if we fail to update manifest. We can add a cleanup task to - // remove them later. + // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. compaction_region .manifest_ctx .update_manifest(Writable, action_list, || { diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index ba439bdf92ea..da551d448598 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -25,7 +25,7 @@ use crate::region::options::CompactionOptions; use crate::sst::file::FileHandle; #[async_trait::async_trait] -pub trait CompactionTask: Debug + Send + Sync + 'static { +pub(crate) trait CompactionTask: Debug + Send + Sync + 'static { async fn run(&mut self); } @@ -35,6 +35,8 @@ pub trait Picker: Debug + Send + Sync + 'static { fn pick(&self, compaction_region: &CompactionRegion) -> Option; } +/// PickerOutput is the output of a [`Picker`]. +/// It contains the outputs of the compaction and the expired SST files. #[derive(Default, Clone, Debug)] pub struct PickerOutput { pub outputs: Vec, @@ -42,6 +44,7 @@ pub struct PickerOutput { pub time_window_size: i64, } +/// Create a new picker based on the compaction request options and compaction options. pub fn new_picker( compact_request_options: compact_request::Options, compaction_options: &CompactionOptions, From 26ae2a98354a9e72673f4be3bd018b4772d263c6 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 16:05:34 +0800 Subject: [PATCH 19/31] refactor: add 'manifest_dir()' in store-api --- src/mito2/src/compaction/compactor.rs | 5 ++--- src/store-api/src/path_utils.rs | 13 +++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 4122075dd887..0721d939cdf4 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -19,11 +19,10 @@ use std::time::Duration; use api::v1::region::compact_request; use common_telemetry::info; use object_store::manager::ObjectStoreManager; -use object_store::util::join_dir; use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::path_utils::region_dir; +use store_api::path_utils::{manifest_dir, region_dir}; use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; @@ -114,7 +113,7 @@ pub async fn open_compaction_region( let manifest_manager = { let region_manifest_options = RegionManifestOptions { - manifest_dir: join_dir(®ion_dir, "manifest"), + manifest_dir: manifest_dir(region_dir.as_str(), req.region_id), object_store: object_store.clone(), compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, diff --git a/src/store-api/src/path_utils.rs b/src/store-api/src/path_utils.rs index b83f6e33d5f4..fa6ae5f13d73 100644 --- a/src/store-api/src/path_utils.rs +++ b/src/store-api/src/path_utils.rs @@ -45,6 +45,10 @@ pub fn region_dir(path: &str, region_id: RegionId) -> String { ) } +pub fn manifest_dir(path: &str, region_id: RegionId) -> String { + format!("{}manifest/", region_dir(path, region_id)) +} + #[cfg(test)] mod tests { use super::*; @@ -57,4 +61,13 @@ mod tests { "data/my_catalog/my_schema/42/42_0000000001/" ); } + + #[test] + fn test_manifest_dir() { + let region_id = RegionId::new(42, 1); + assert_eq!( + manifest_dir("my_catalog/my_schema", region_id), + "data/my_catalog/my_schema/42/42_0000000001/manifest/" + ); + } } From 9a05b6a28c326aba52989bf7d4dcab6b42c3fe30 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 16:14:50 +0800 Subject: [PATCH 20/31] refactor: move the default implementation to DefaultCompactor --- src/mito2/src/compaction/compactor.rs | 34 ++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 0721d939cdf4..ec7ffbfa5ffb 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -196,6 +196,32 @@ impl MergeOutput { #[async_trait::async_trait] pub trait Compactor: Send + Sync + 'static { /// Merge SST files for a region. + async fn merge_ssts( + &self, + compaction_region: &CompactionRegion, + picker_output: PickerOutput, + ) -> Result; + + /// Update the manifest after merging SST files. + async fn update_manifest( + &self, + compaction_region: &CompactionRegion, + merge_output: MergeOutput, + ) -> Result<()>; + + /// Execute compaction for a region. + async fn compact( + &self, + compaction_region: &CompactionRegion, + compact_request_options: compact_request::Options, + ) -> Result<()>; +} + +/// DefaultCompactor is the default implementation of Compactor. +pub struct DefaultCompactor; + +#[async_trait::async_trait] +impl Compactor for DefaultCompactor { async fn merge_ssts( &self, compaction_region: &CompactionRegion, @@ -324,7 +350,6 @@ pub trait Compactor: Send + Sync + 'static { }) } - /// Update the manifest after merging SST files. async fn update_manifest( &self, compaction_region: &CompactionRegion, @@ -372,7 +397,6 @@ pub trait Compactor: Send + Sync + 'static { Ok(()) } - /// Execute compaction for a region. async fn compact( &self, compaction_region: &CompactionRegion, @@ -407,9 +431,3 @@ pub trait Compactor: Send + Sync + 'static { self.update_manifest(compaction_region, merge_output).await } } - -/// DefaultCompactor is the default implementation of Compactor. -pub struct DefaultCompactor; - -#[async_trait::async_trait] -impl Compactor for DefaultCompactor {} From 130fb908d32efa98c04bd05c6ae464b9c57e0499 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 16:20:48 +0800 Subject: [PATCH 21/31] refactor: remove Options from MergeOutput --- src/mito2/src/compaction/compactor.rs | 33 +++++++++------------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ec7ffbfa5ffb..1dd5f6989586 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -180,15 +180,16 @@ pub async fn open_compaction_region( } /// `[MergeOutput]` represents the output of merging SST files. +#[derive(Default, Clone, Debug)] pub struct MergeOutput { - pub files_to_add: Option>, - pub files_to_remove: Option>, + pub files_to_add: Vec, + pub files_to_remove: Vec, pub compaction_time_window: Option, } impl MergeOutput { pub fn is_empty(&self) -> bool { - self.files_to_add.is_none() && self.files_to_remove.is_none() + self.files_to_add.is_empty() && self.files_to_remove.is_empty() } } @@ -344,8 +345,8 @@ impl Compactor for DefaultCompactor { ); Ok(MergeOutput { - files_to_add: Some(output_files), - files_to_remove: Some(inputs), + files_to_add: output_files, + files_to_remove: inputs, compaction_time_window: Some(picker_output.time_window_size), }) } @@ -355,26 +356,14 @@ impl Compactor for DefaultCompactor { compaction_region: &CompactionRegion, merge_output: MergeOutput, ) -> Result<()> { - let files_to_add = { - if let Some(files) = merge_output.files_to_add { - files - } else { - return Ok(()); - } - }; - - let files_to_remove = { - if let Some(files) = merge_output.files_to_remove { - files - } else { - return Ok(()); - } - }; + if merge_output.files_to_add.is_empty() || merge_output.files_to_remove.is_empty() { + return Ok(()); + } // Write region edit to manifest. let edit = RegionEdit { - files_to_add, - files_to_remove, + files_to_add: merge_output.files_to_add, + files_to_remove: merge_output.files_to_remove, compaction_time_window: merge_output .compaction_time_window .map(|seconds| Duration::from_secs(seconds as u64)), From e141e91f9d069fcfa4eedec8d5c16229555ade6a Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 16:24:20 +0800 Subject: [PATCH 22/31] chore: minor modification --- src/mito2/src/compaction/compactor.rs | 2 ++ src/mito2/src/compaction/picker.rs | 1 + src/mito2/src/compaction/task.rs | 8 ++------ 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 1dd5f6989586..35386ef7deec 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -232,6 +232,7 @@ impl Compactor for DefaultCompactor { let mut futs = Vec::with_capacity(picker_output.outputs.len()); let mut compacted_inputs = Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + for output in picker_output.outputs.drain(..) { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); @@ -383,6 +384,7 @@ impl Compactor for DefaultCompactor { ); }) .await?; + Ok(()) } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index da551d448598..715e8effecdc 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -32,6 +32,7 @@ pub(crate) trait CompactionTask: Debug + Send + Sync + 'static { /// Picker picks input SST files for compaction. /// Different compaction strategy may implement different pickers. pub trait Picker: Debug + Send + Sync + 'static { + /// Picks input SST files for compaction. fn pick(&self, compaction_region: &CompactionRegion) -> Option; } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index e9394f90e2c5..645b3678734f 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -30,6 +30,7 @@ use crate::request::{ }; use crate::worker::WorkerListener; +/// Maximum number of compaction tasks in parallel. pub const MAX_PARALLEL_COMPACTION: usize = 8; pub(crate) struct CompactionTaskImpl { @@ -52,12 +53,7 @@ impl Debug for CompactionTaskImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TwcsCompactionTask") .field("region_id", &self.compaction_region.region_id) - .field("outputs", &self.picker_output.outputs) - .field("expired_ssts", &self.picker_output.expired_ssts) - .field( - "compaction_time_window", - &self.picker_output.time_window_size, - ) + .field("picker_output", &self.picker_output) .field( "append_mode", &self.compaction_region.region_options.append_mode, From e3e6f121c4a339a516fcd98947e4de191aed88cc Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 16:44:45 +0800 Subject: [PATCH 23/31] fix: clippy errors --- src/mito2/src/compaction.rs | 11 ++++++----- src/mito2/src/compaction/compactor.rs | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 46cb11ccfffe..2aea558c707a 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -296,11 +296,12 @@ impl CompactionScheduler { version_control: version_control.clone(), }; - let pick_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick"]) - .start_timer(); - let picker_output = picker.pick(&compaction_region); - drop(pick_timer); + let picker_output = { + let _pick_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick"]) + .start_timer(); + picker.pick(&compaction_region) + }; let picker_output = if let Some(picker_output) = picker_output { picker_output diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 35386ef7deec..6dca4e2f1775 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -398,7 +398,7 @@ impl Compactor for DefaultCompactor { compact_request_options, &compaction_region.region_options.compaction, ) - .pick(&compaction_region); + .pick(compaction_region); if let Some(picker_output) = picker_output { picker_output @@ -411,7 +411,7 @@ impl Compactor for DefaultCompactor { } }; - let merge_output = self.merge_ssts(&compaction_region, picker_output).await?; + let merge_output = self.merge_ssts(compaction_region, picker_output).await?; if merge_output.is_empty() { info!( "No files to compact for region_id: {}", From 93f5a74cc62ee72036c5f70421f708d0c4c4cb34 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 18:24:54 +0800 Subject: [PATCH 24/31] fix: unit test errors --- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/compaction/twcs.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 6dca4e2f1775..565ceb1adf4c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -357,7 +357,7 @@ impl Compactor for DefaultCompactor { compaction_region: &CompactionRegion, merge_output: MergeOutput, ) -> Result<()> { - if merge_output.files_to_add.is_empty() || merge_output.files_to_remove.is_empty() { + if merge_output.is_empty() { return Ok(()); } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 3bfe56015192..2273f2384928 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -145,8 +145,6 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { - // Nothing to compact, we are done. - // You can notify all waiters as we consume the compaction request. return None; } From 17bf852d3aa59ce056bc863bbd2d6beec534a03f Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 18:48:30 +0800 Subject: [PATCH 25/31] refactor: remove 'manifest_dir()' from store-api crate(already have one in opener) --- src/mito2/src/compaction/compactor.rs | 5 +++-- src/mito2/src/region/opener.rs | 2 +- src/store-api/src/path_utils.rs | 13 ------------- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 565ceb1adf4c..b33e2f102506 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -22,7 +22,7 @@ use object_store::manager::ObjectStoreManager; use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::path_utils::{manifest_dir, region_dir}; +use store_api::path_utils::region_dir; use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; @@ -37,6 +37,7 @@ use crate::manifest::storage::manifest_compress_type; use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; use crate::read::Source; +use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::ManifestContext; @@ -113,7 +114,7 @@ pub async fn open_compaction_region( let manifest_manager = { let region_manifest_options = RegionManifestOptions { - manifest_dir: manifest_dir(region_dir.as_str(), req.region_id), + manifest_dir: new_manifest_dir(region_dir.as_str()), object_store: object_store.clone(), compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f046471cfc2b..e20a00d35ae1 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -530,6 +530,6 @@ where } /// Returns the directory to the manifest files. -fn new_manifest_dir(region_dir: &str) -> String { +pub(crate) fn new_manifest_dir(region_dir: &str) -> String { join_dir(region_dir, "manifest") } diff --git a/src/store-api/src/path_utils.rs b/src/store-api/src/path_utils.rs index fa6ae5f13d73..b83f6e33d5f4 100644 --- a/src/store-api/src/path_utils.rs +++ b/src/store-api/src/path_utils.rs @@ -45,10 +45,6 @@ pub fn region_dir(path: &str, region_id: RegionId) -> String { ) } -pub fn manifest_dir(path: &str, region_id: RegionId) -> String { - format!("{}manifest/", region_dir(path, region_id)) -} - #[cfg(test)] mod tests { use super::*; @@ -61,13 +57,4 @@ mod tests { "data/my_catalog/my_schema/42/42_0000000001/" ); } - - #[test] - fn test_manifest_dir() { - let region_id = RegionId::new(42, 1); - assert_eq!( - manifest_dir("my_catalog/my_schema", region_id), - "data/my_catalog/my_schema/42/42_0000000001/manifest/" - ); - } } From 48f9398a84c6ff535676d892bdd75c80b12b9488 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 20:17:02 +0800 Subject: [PATCH 26/31] refactor: use 'region_dir' in CompactionRequest --- src/mito2/src/compaction/compactor.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index b33e2f102506..3e67bea7f083 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -22,7 +22,6 @@ use object_store::manager::ObjectStoreManager; use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::path_utils::region_dir; use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; @@ -67,9 +66,8 @@ pub struct CompactionRegion { /// CompactionRequest represents the request to compact a region. #[derive(Debug, Clone)] pub struct CompactionRequest { - pub catalog: String, - pub schema: String, pub region_id: RegionId, + pub region_dir: String, pub options: HashMap, pub compaction_options: compact_request::Options, } @@ -82,11 +80,6 @@ pub async fn open_compaction_region( object_store_manager: ObjectStoreManager, ) -> Result { let region_options = RegionOptions::try_from(&req.options)?; - let region_dir = region_dir( - format!("{}/{}", &req.catalog, &req.schema).as_str(), - req.region_id, - ); - let object_store = { let name = ®ion_options.storage; if let Some(name) = name { @@ -106,7 +99,7 @@ pub async fn open_compaction_region( .await?; Arc::new(AccessLayer::new( - region_dir.to_string(), + req.region_dir.as_str(), object_store.clone(), intermediate_manager, )) @@ -114,7 +107,7 @@ pub async fn open_compaction_region( let manifest_manager = { let region_manifest_options = RegionManifestOptions { - manifest_dir: new_manifest_dir(region_dir.as_str()), + manifest_dir: new_manifest_dir(req.region_dir.as_str()), object_store: object_store.clone(), compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, @@ -124,7 +117,7 @@ pub async fn open_compaction_region( .await? .context(EmptyRegionDirSnafu { region_id: req.region_id, - region_dir, + region_dir: req.region_dir.as_str(), })? }; From 2f6d9ef93b9be522e4b59bd375c66db75495e3c0 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 6 Jun 2024 20:19:46 +0800 Subject: [PATCH 27/31] refactor: refine naming --- src/mito2/src/compaction/compactor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 3e67bea7f083..f5f306f50afc 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -68,7 +68,7 @@ pub struct CompactionRegion { pub struct CompactionRequest { pub region_id: RegionId, pub region_dir: String, - pub options: HashMap, + pub region_options: HashMap, pub compaction_options: compact_request::Options, } @@ -79,7 +79,7 @@ pub async fn open_compaction_region( mito_config: &MitoConfig, object_store_manager: ObjectStoreManager, ) -> Result { - let region_options = RegionOptions::try_from(&req.options)?; + let region_options = RegionOptions::try_from(&req.region_options)?; let object_store = { let name = ®ion_options.storage; if let Some(name) = name { From d26c64a11db50f86fd92f715c67dbde8c87ee8c7 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 7 Jun 2024 18:20:50 +0800 Subject: [PATCH 28/31] refactor: refine naming --- src/mito2/src/compaction/compactor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 39a9e48f26bd..9be02940e383 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -62,9 +62,9 @@ pub struct CompactionRegion { pub(crate) current_version: VersionRef, } -/// CompactionRequest represents the request to compact a region. +/// CompactorRequest represents the request to compact a region. #[derive(Debug, Clone)] -pub struct CompactionRequest { +pub struct CompactorRequest { pub region_id: RegionId, pub region_dir: String, pub region_options: HashMap, @@ -74,7 +74,7 @@ pub struct CompactionRequest { /// Open a compaction region from a compaction request. /// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( - req: &CompactionRequest, + req: &CompactorRequest, mito_config: &MitoConfig, object_store_manager: ObjectStoreManager, ) -> Result { From 496818659967125394f6b9d576e0d9533203d2c8 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 7 Jun 2024 18:37:04 +0800 Subject: [PATCH 29/31] refactor: remove clone() --- src/mito2/src/compaction/compactor.rs | 9 ++++++--- src/mito2/src/compaction/twcs.rs | 12 +++++------- src/mito2/src/compaction/window.rs | 5 ++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 9be02940e383..16d3eaf1e89c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -221,7 +221,6 @@ impl Compactor for DefaultCompactor { compaction_region: &CompactionRegion, mut picker_output: PickerOutput, ) -> Result { - let current_version = compaction_region.current_version.clone(); let mut futs = Vec::with_capacity(picker_output.outputs.len()); let mut compacted_inputs = Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); @@ -269,8 +268,12 @@ impl Compactor for DefaultCompactor { let file_id = output.output_file_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); - let index_options = current_version.options.index_options.clone(); - let append_mode = current_version.options.append_mode; + let index_options = compaction_region + .current_version + .options + .index_options + .clone(); + let append_mode = compaction_region.current_version.options.append_mode; futs.push(async move { let reader = build_sst_reader( region_metadata.clone(), diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 71ab09bc1c9c..15937483e8e5 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -111,12 +111,9 @@ impl TwcsPicker { impl Picker for TwcsPicker { fn pick(&self, compaction_region: &CompactionRegion) -> Option { - let current_version = compaction_region.current_version.clone(); - let region_metadata = current_version.metadata.clone(); - let region_id = region_metadata.region_id; - - let levels = current_version.ssts.levels(); - let ttl = current_version.options.ttl; + let region_id = compaction_region.region_id; + let levels = compaction_region.current_version.ssts.levels(); + let ttl = compaction_region.current_version.options.ttl; let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); if !expired_ssts.is_empty() { info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); @@ -124,7 +121,8 @@ impl Picker for TwcsPicker { expired_ssts.iter().for_each(|f| f.set_compacting(true)); } - let compaction_time_window = current_version + let compaction_time_window = compaction_region + .current_version .compaction_time_window .map(|window| window.as_secs() as i64); let time_window_size = compaction_time_window diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 798e6a137c34..1683d28f9a9c 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -102,10 +102,9 @@ impl WindowedCompactionPicker { impl Picker for WindowedCompactionPicker { fn pick(&self, compaction_region: &CompactionRegion) -> Option { - let current_version = compaction_region.current_version.clone(); let (outputs, expired_ssts, time_window) = self.pick_inner( - current_version.metadata.region_id, - ¤t_version, + compaction_region.current_version.metadata.region_id, + &compaction_region.current_version, Timestamp::current_millis(), ); From 574fd9f7cfa2dac301ea7190affa9e3f905be01a Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 7 Jun 2024 18:47:05 +0800 Subject: [PATCH 30/31] chore: add comments --- src/mito2/src/compaction/compactor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 16d3eaf1e89c..0f528ad311c8 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -374,6 +374,8 @@ impl Compactor for DefaultCompactor { Ok(edit) } + // The default implementation of compact combines the merge_ssts and update_manifest functions. + // Note: It's local compaction and only used for testing purpose. async fn compact( &self, compaction_region: &CompactionRegion, From 77c085c41a92df9294ebbce3ea1f4e6133b27478 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 7 Jun 2024 19:16:32 +0800 Subject: [PATCH 31/31] refactor: add PickerOutput field in CompactorRequest --- src/mito2/src/compaction/compactor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 0f528ad311c8..a6694c8ef7bd 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -69,6 +69,7 @@ pub struct CompactorRequest { pub region_dir: String, pub region_options: HashMap, pub compaction_options: compact_request::Options, + pub picker_output: PickerOutput, } /// Open a compaction region from a compaction request.