diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5236c5d28d02..6ee21b736eae 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -32,6 +32,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion_common::ScalarValue; use datafusion_expr::Expr; +use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -57,7 +58,7 @@ use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileHandle, FileId, Level}; +use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -284,6 +285,7 @@ impl CompactionScheduler { cache_manager: cache_manager.clone(), access_layer: access_layer.clone(), manifest_ctx: manifest_ctx.clone(), + file_purger: None, }; let picker_output = { @@ -441,6 +443,16 @@ pub struct CompactionOutput { pub output_time_range: Option, } +/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta]. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SerializedCompactionOutput { + output_file_id: FileId, + output_level: Level, + inputs: Vec, + filter_deleted: bool, + output_time_range: Option, +} + /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. async fn build_sst_reader( metadata: RegionMetadataRef, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 50cf0d09e606..827aba80cca0 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -62,28 +61,26 @@ pub struct CompactionRegion { pub(crate) access_layer: AccessLayerRef, pub(crate) manifest_ctx: Arc, pub(crate) current_version: VersionRef, + pub(crate) file_purger: Option>, } -/// CompactorRequest represents the request to compact a region. +/// OpenCompactionRegionRequest represents the request to open a compaction region. #[derive(Debug, Clone)] -pub struct CompactorRequest { +pub struct OpenCompactionRegionRequest { pub region_id: RegionId, pub region_dir: String, - pub region_options: HashMap, - pub compaction_options: compact_request::Options, - pub picker_output: PickerOutput, + pub region_options: RegionOptions, } /// Open a compaction region from a compaction request. /// It's simple version of RegionOpener::open(). pub async fn open_compaction_region( - req: &CompactorRequest, + req: &OpenCompactionRegionRequest, mito_config: &MitoConfig, object_store_manager: ObjectStoreManager, ) -> Result { - let region_options = RegionOptions::try_from(&req.region_options)?; let object_store = { - let name = ®ion_options.storage; + let name = &req.region_options.storage; if let Some(name) = name { object_store_manager .find(name) @@ -139,8 +136,8 @@ pub async fn open_compaction_region( let current_version = { let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone())) .builder_for_options( - region_options.memtable.as_ref(), - !region_options.append_mode, + req.region_options.memtable.as_ref(), + !req.region_options.append_mode, ); // Initial memtable id is 0. @@ -148,7 +145,7 @@ pub async fn open_compaction_region( region_metadata.clone(), memtable_builder.clone(), 0, - region_options.compaction.time_window(), + req.region_options.compaction.time_window(), )); let version = VersionBuilder::new(region_metadata.clone(), mutable) @@ -157,7 +154,7 @@ pub async fn open_compaction_region( .flushed_sequence(manifest.flushed_sequence) .truncated_entry_id(manifest.truncated_entry_id) .compaction_time_window(manifest.compaction_time_window) - .options(region_options.clone()) + .options(req.region_options.clone()) .build(); let version_control = Arc::new(VersionControl::new(version)); version_control.current().version @@ -165,7 +162,7 @@ pub async fn open_compaction_region( Ok(CompactionRegion { region_id: req.region_id, - region_options: region_options.clone(), + region_options: req.region_options.clone(), region_dir: req.region_dir.clone(), engine_config: Arc::new(mito_config.clone()), region_metadata: region_metadata.clone(), @@ -173,9 +170,16 @@ pub async fn open_compaction_region( access_layer, manifest_ctx, current_version, + file_purger: Some(file_purger), }) } +impl CompactionRegion { + pub fn file_purger(&self) -> Option> { + self.file_purger.clone() + } +} + /// `[MergeOutput]` represents the output of merging SST files. #[derive(Default, Clone, Debug, Serialize, Deserialize)] pub struct MergeOutput { diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 715e8effecdc..3ef5e6561215 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -16,13 +16,15 @@ use std::fmt::Debug; use std::sync::Arc; use api::v1::region::compact_request; +use serde::{Deserialize, Serialize}; use crate::compaction::compactor::CompactionRegion; use crate::compaction::twcs::TwcsPicker; use crate::compaction::window::WindowedCompactionPicker; -use crate::compaction::CompactionOutput; +use crate::compaction::{CompactionOutput, SerializedCompactionOutput}; use crate::region::options::CompactionOptions; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileMeta}; +use crate::sst::file_purger::FilePurger; #[async_trait::async_trait] pub(crate) trait CompactionTask: Debug + Send + Sync + 'static { @@ -45,6 +47,76 @@ pub struct PickerOutput { pub time_window_size: i64, } +/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta]. +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct SerializedPickerOutput { + pub outputs: Vec, + pub expired_ssts: Vec, + pub time_window_size: i64, +} + +impl From<&PickerOutput> for SerializedPickerOutput { + fn from(input: &PickerOutput) -> Self { + let outputs = input + .outputs + .iter() + .map(|output| SerializedCompactionOutput { + output_file_id: output.output_file_id, + output_level: output.output_level, + inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(), + filter_deleted: output.filter_deleted, + output_time_range: output.output_time_range, + }) + .collect(); + let expired_ssts = input + .expired_ssts + .iter() + .map(|s| s.meta_ref().clone()) + .collect(); + Self { + outputs, + expired_ssts, + time_window_size: input.time_window_size, + } + } +} + +impl PickerOutput { + /// Converts a [SerializedPickerOutput] to a [PickerOutput]. + pub fn from_serialized( + input: SerializedPickerOutput, + file_purger: Arc, + ) -> Self { + let outputs = input + .outputs + .into_iter() + .map(|output| CompactionOutput { + output_file_id: output.output_file_id, + output_level: output.output_level, + inputs: output + .inputs + .into_iter() + .map(|file_meta| FileHandle::new(file_meta, file_purger.clone())) + .collect(), + filter_deleted: output.filter_deleted, + output_time_range: output.output_time_range, + }) + .collect(); + + let expired_ssts = input + .expired_ssts + .into_iter() + .map(|file_meta| FileHandle::new(file_meta, file_purger.clone())) + .collect(); + + Self { + outputs, + expired_ssts, + time_window_size: input.time_window_size, + } + } +} + /// Create a new picker based on the compaction request options and compaction options. pub fn new_picker( compact_request_options: compact_request::Options, @@ -67,3 +139,78 @@ pub fn new_picker( } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::compaction::test_util::new_file_handle; + use crate::sst::file::FileId; + use crate::test_util::new_noop_file_purger; + + #[test] + fn test_picker_output_serialization() { + let inputs_file_handle = vec![ + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + ]; + let expired_ssts_file_handle = vec![ + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + ]; + + let picker_output = PickerOutput { + outputs: vec![ + CompactionOutput { + output_file_id: FileId::random(), + output_level: 0, + inputs: inputs_file_handle.clone(), + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_file_id: FileId::random(), + output_level: 0, + inputs: inputs_file_handle.clone(), + filter_deleted: false, + output_time_range: None, + }, + ], + expired_ssts: expired_ssts_file_handle.clone(), + time_window_size: 1000, + }; + + let picker_output_str = + serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap(); + let serialized_picker_output: SerializedPickerOutput = + serde_json::from_str(&picker_output_str).unwrap(); + let picker_output_from_serialized = + PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger()); + + picker_output + .expired_ssts + .iter() + .zip(picker_output_from_serialized.expired_ssts.iter()) + .for_each(|(expected, actual)| { + assert_eq!(expected.meta_ref(), actual.meta_ref()); + }); + + picker_output + .outputs + .iter() + .zip(picker_output_from_serialized.outputs.iter()) + .for_each(|(expected, actual)| { + assert_eq!(expected.output_file_id, actual.output_file_id); + assert_eq!(expected.output_level, actual.output_level); + expected + .inputs + .iter() + .zip(actual.inputs.iter()) + .for_each(|(expected, actual)| { + assert_eq!(expected.meta_ref(), actual.meta_ref()); + }); + assert_eq!(expected.filter_deleted, actual.filter_deleted); + assert_eq!(expected.output_time_range, actual.output_time_range); + }); + } +}