Skip to content

Commit

Permalink
refactor: add SerializedPickerOutput and field modification of `Com…
Browse files Browse the repository at this point in the history
…pactorRequest` (#4198)

* refactor: remove compaction_options and use RegionOptions type for region_options

* refactor: add file_purger field in CompactionRegion

* refactor: add SerializedPickerOutput

* refactor: rename CompactorRequest to OpenCompactionRegionRequest and remove PickerOutput

* refactor: use &PickerOutput instead of clone()
  • Loading branch information
zyy17 authored Jun 25, 2024
1 parent 4d4a6cd commit 948c869
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 17 deletions.
14 changes: 13 additions & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -57,7 +58,7 @@ use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, Level};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;

Expand Down Expand Up @@ -284,6 +285,7 @@ impl CompactionScheduler {
cache_manager: cache_manager.clone(),
access_layer: access_layer.clone(),
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
};

let picker_output = {
Expand Down Expand Up @@ -441,6 +443,16 @@ pub struct CompactionOutput {
pub output_time_range: Option<TimestampRange>,
}

/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedCompactionOutput {
output_file_id: FileId,
output_level: Level,
inputs: Vec<FileMeta>,
filter_deleted: bool,
output_time_range: Option<TimestampRange>,
}

/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
metadata: RegionMetadataRef,
Expand Down
32 changes: 18 additions & 14 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -62,28 +61,26 @@ pub struct CompactionRegion {
pub(crate) access_layer: AccessLayerRef,
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: VersionRef,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
}

/// CompactorRequest represents the request to compact a region.
/// OpenCompactionRegionRequest represents the request to open a compaction region.
#[derive(Debug, Clone)]
pub struct CompactorRequest {
pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: HashMap<String, String>,
pub compaction_options: compact_request::Options,
pub picker_output: PickerOutput,
pub region_options: RegionOptions,
}

/// Open a compaction region from a compaction request.
/// It's simple version of RegionOpener::open().
pub async fn open_compaction_region(
req: &CompactorRequest,
req: &OpenCompactionRegionRequest,
mito_config: &MitoConfig,
object_store_manager: ObjectStoreManager,
) -> Result<CompactionRegion> {
let region_options = RegionOptions::try_from(&req.region_options)?;
let object_store = {
let name = &region_options.storage;
let name = &req.region_options.storage;
if let Some(name) = name {
object_store_manager
.find(name)
Expand Down Expand Up @@ -139,16 +136,16 @@ pub async fn open_compaction_region(
let current_version = {
let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone()))
.builder_for_options(
region_options.memtable.as_ref(),
!region_options.append_mode,
req.region_options.memtable.as_ref(),
!req.region_options.append_mode,
);

// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
region_metadata.clone(),
memtable_builder.clone(),
0,
region_options.compaction.time_window(),
req.region_options.compaction.time_window(),
));

let version = VersionBuilder::new(region_metadata.clone(), mutable)
Expand All @@ -157,25 +154,32 @@ pub async fn open_compaction_region(
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options.clone())
.options(req.region_options.clone())
.build();
let version_control = Arc::new(VersionControl::new(version));
version_control.current().version
};

Ok(CompactionRegion {
region_id: req.region_id,
region_options: region_options.clone(),
region_options: req.region_options.clone(),
region_dir: req.region_dir.clone(),
engine_config: Arc::new(mito_config.clone()),
region_metadata: region_metadata.clone(),
cache_manager: Arc::new(CacheManager::default()),
access_layer,
manifest_ctx,
current_version,
file_purger: Some(file_purger),
})
}

impl CompactionRegion {
pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
self.file_purger.clone()
}
}

/// `[MergeOutput]` represents the output of merging SST files.
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct MergeOutput {
Expand Down
151 changes: 149 additions & 2 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use std::fmt::Debug;
use std::sync::Arc;

use api::v1::region::compact_request;
use serde::{Deserialize, Serialize};

use crate::compaction::compactor::CompactionRegion;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::compaction::CompactionOutput;
use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
use crate::region::options::CompactionOptions;
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::FilePurger;

#[async_trait::async_trait]
pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
Expand All @@ -45,6 +47,76 @@ pub struct PickerOutput {
pub time_window_size: i64,
}

/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct SerializedPickerOutput {
pub outputs: Vec<SerializedCompactionOutput>,
pub expired_ssts: Vec<FileMeta>,
pub time_window_size: i64,
}

impl From<&PickerOutput> for SerializedPickerOutput {
fn from(input: &PickerOutput) -> Self {
let outputs = input
.outputs
.iter()
.map(|output| SerializedCompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
filter_deleted: output.filter_deleted,
output_time_range: output.output_time_range,
})
.collect();
let expired_ssts = input
.expired_ssts
.iter()
.map(|s| s.meta_ref().clone())
.collect();
Self {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
}
}
}

impl PickerOutput {
/// Converts a [SerializedPickerOutput] to a [PickerOutput].
pub fn from_serialized(
input: SerializedPickerOutput,
file_purger: Arc<dyn FilePurger>,
) -> Self {
let outputs = input
.outputs
.into_iter()
.map(|output| CompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output
.inputs
.into_iter()
.map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
.collect(),
filter_deleted: output.filter_deleted,
output_time_range: output.output_time_range,
})
.collect();

let expired_ssts = input
.expired_ssts
.into_iter()
.map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
.collect();

Self {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
}
}
}

/// Create a new picker based on the compaction request options and compaction options.
pub fn new_picker(
compact_request_options: compact_request::Options,
Expand All @@ -67,3 +139,78 @@ pub fn new_picker(
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::FileId;
use crate::test_util::new_noop_file_purger;

#[test]
fn test_picker_output_serialization() {
let inputs_file_handle = vec![
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
];
let expired_ssts_file_handle = vec![
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
];

let picker_output = PickerOutput {
outputs: vec![
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
],
expired_ssts: expired_ssts_file_handle.clone(),
time_window_size: 1000,
};

let picker_output_str =
serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
let serialized_picker_output: SerializedPickerOutput =
serde_json::from_str(&picker_output_str).unwrap();
let picker_output_from_serialized =
PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());

picker_output
.expired_ssts
.iter()
.zip(picker_output_from_serialized.expired_ssts.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.meta_ref(), actual.meta_ref());
});

picker_output
.outputs
.iter()
.zip(picker_output_from_serialized.outputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.output_file_id, actual.output_file_id);
assert_eq!(expected.output_level, actual.output_level);
expected
.inputs
.iter()
.zip(actual.inputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.meta_ref(), actual.meta_ref());
});
assert_eq!(expected.filter_deleted, actual.filter_deleted);
assert_eq!(expected.output_time_range, actual.output_time_range);
});
}
}

0 comments on commit 948c869

Please sign in to comment.