Skip to content

Commit

Permalink
feat(mito2): compaction (GreptimeTeam#2317)
Browse files Browse the repository at this point in the history
* feat: compaction component

* feat: mito2 compaction

* Avoid building time range predicates when merge SST files since in TWCS we don't enforce strict time window.

* fix: some CR comments

* minor: change CompactionRequest::senders to an option

* chore: handle compaction finish error

* feat: integrate compaction into region worker

* chore: rebase upstream

* fix: Some CR comments

* chore: Apply suggestions from code review

* style: fix clippy

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent c5f276d commit 23c17dc
Show file tree
Hide file tree
Showing 23 changed files with 1,551 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ common-test-util = { workspace = true, optional = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
datatypes = { workspace = true }
futures.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl AccessLayer {
}

/// Returns a new parquet writer to write the SST for specific `file_id`.
// TODO(hl): maybe rename to [sst_writer].
pub(crate) fn write_sst(
&self,
file_id: FileId,
Expand Down
94 changes: 94 additions & 0 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod output;
mod picker;
#[cfg(test)]
mod test_util;
mod twcs;

use std::sync::Arc;
use std::time::Duration;

use common_query::Output;
use common_telemetry::debug;
pub use picker::CompactionPickerRef;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use tokio::sync::{mpsc, oneshot};

use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error;
use crate::error::Result;
use crate::region::version::VersionRef;
use crate::request::WorkerRequest;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;

/// Region compaction request.
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) waiter: Option<oneshot::Sender<error::Result<Output>>>,
pub(crate) file_purger: FilePurgerRef,
}

impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
}

/// Builds compaction picker according to [CompactionStrategy].
pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
)) as Arc<_>,
}
}

pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
// TODO(hl): maybe tracks region compaction status in CompactionScheduler
}

impl CompactionScheduler {
pub(crate) fn new(scheduler: SchedulerRef) -> Self {
Self { scheduler }
}

/// Schedules a region compaction task.
pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> {
self.scheduler.schedule(Box::pin(async {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
debug!(
"Pick compaction strategy {:?} for region: {}",
picker,
req.region_id()
);
let Some(mut task) = picker.pick(req) else {
return;
};
task.run().await;
}))
}
}
88 changes: 88 additions & 0 deletions src/mito2/src/compaction/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::access_layer::AccessLayerRef;
use crate::error;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::parquet::{SstInfo, WriteOptions};

#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// The left bound of time window.
pub time_window_bound: i64,
/// Time window size in seconds.
pub time_window_sec: i64,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}

impl CompactionOutput {
pub(crate) async fn build(
&self,
region_id: RegionId,
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> error::Result<Option<FileMeta>> {
let reader = build_sst_reader(schema.clone(), sst_layer.clone(), &self.inputs).await?;

let opts = WriteOptions {
write_buffer_size: sst_write_buffer_size,
..Default::default()
};

// TODO(hl): measure merge elapsed time.

let mut writer = sst_layer.write_sst(self.output_file_id, schema, Source::Reader(reader));
let meta = writer.write_all(&opts).await?.map(
|SstInfo {
time_range,
file_size,
..
}| {
FileMeta {
region_id,
file_id: self.output_file_id,
time_range,
level: self.output_level,
file_size,
}
},
);

Ok(meta)
}
}

/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?)
.with_files(inputs.to_vec())
.build_reader()
.await
}
47 changes: 47 additions & 0 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::sync::Arc;

use crate::compaction::CompactionRequest;

pub type CompactionPickerRef = Arc<dyn Picker + Send + Sync>;

#[async_trait::async_trait]
pub trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(&mut self);
}

/// Picker picks input SST files and builds the compaction task.
/// Different compaction strategy may implement different pickers.
pub trait Picker: Debug + Send + 'static {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>>;
}

pub struct PickerContext {
compaction_time_window: Option<i64>,
}

impl PickerContext {
pub fn with(compaction_time_window: Option<i64>) -> Self {
Self {
compaction_time_window,
}
}

pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
}
41 changes: 41 additions & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_time::Timestamp;

use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::test_util::new_noop_file_purger;

/// Test util to create file handles.
pub fn new_file_handle(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id,
time_range: (
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
),
level,
file_size: 0,
},
file_purger,
)
}
Loading

0 comments on commit 23c17dc

Please sign in to comment.