Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add Compactor trait to abstract the compaction #4097

Merged
merged 33 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0d5b790
refactor: add Compactor trait
zyy17 Jun 2, 2024
a91c5d6
chore: add compact() in Compactor trait and expose compaction module
zyy17 Jun 2, 2024
37b0464
refactor: add CompactionRequest and open_compaction_region
zyy17 Jun 2, 2024
014bc22
refactor: export the compaction api
zyy17 Jun 3, 2024
f9d1e66
refactor: add DefaultCompactor::new_from_request
zyy17 Jun 3, 2024
3eb0dab
refactor: no need to pass mito_config in open_compaction_region()
zyy17 Jun 3, 2024
8cd55a2
refactor: CompactionRequest -> &CompactionRequest
zyy17 Jun 3, 2024
891426d
fix: typo
zyy17 Jun 3, 2024
8ff9c6f
docs: add docs for public apis
zyy17 Jun 4, 2024
acabc07
refactor: remove 'Picker' from Compactor
zyy17 Jun 4, 2024
de8496c
chore: add logs
zyy17 Jun 4, 2024
6bee93e
chore: change pub attribute for Picker
zyy17 Jun 4, 2024
a63c9f5
refactor: remove do_merge_ssts()
zyy17 Jun 4, 2024
3a360c0
refactor: update comments
zyy17 Jun 4, 2024
35feb02
refactor: use CompactionRegion argument in Picker
zyy17 Jun 5, 2024
55033f9
chore: make compaction module public and remove unnessary clone
zyy17 Jun 6, 2024
15c102c
refactor: move build_compaction_task() in CompactionScheduler{}
zyy17 Jun 6, 2024
8ad6fff
chore: use in open_compaction_region() and add some comments for pub…
zyy17 Jun 6, 2024
26ae2a9
refactor: add 'manifest_dir()' in store-api
zyy17 Jun 6, 2024
9a05b6a
refactor: move the default implementation to DefaultCompactor
zyy17 Jun 6, 2024
130fb90
refactor: remove Options from MergeOutput
zyy17 Jun 6, 2024
e141e91
chore: minor modification
zyy17 Jun 6, 2024
e3e6f12
fix: clippy errors
zyy17 Jun 6, 2024
93f5a74
fix: unit test errors
zyy17 Jun 6, 2024
17bf852
refactor: remove 'manifest_dir()' from store-api crate(already have o…
zyy17 Jun 6, 2024
48f9398
refactor: use 'region_dir' in CompactionRequest
zyy17 Jun 6, 2024
2f6d9ef
refactor: refine naming
zyy17 Jun 6, 2024
861ea41
chore: sync main branch
zyy17 Jun 7, 2024
d26c64a
refactor: refine naming
zyy17 Jun 7, 2024
4968186
refactor: remove clone()
zyy17 Jun 7, 2024
574fd9f
chore: add comments
zyy17 Jun 7, 2024
77c085c
refactor: add PickerOutput field in CompactorRequest
zyy17 Jun 7, 2024
f8d88a2
chore: sync main branch
zyy17 Jun 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 75 additions & 40 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

mod buckets;
mod picker;
pub mod compactor;
pub mod picker;
mod task;
#[cfg(test)]
mod test_util;
Expand All @@ -31,7 +32,6 @@ use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
pub use picker::CompactionPickerRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -40,8 +40,9 @@ use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::compaction::compactor::{CompactionRegion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
Expand All @@ -52,7 +53,6 @@ use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
Expand Down Expand Up @@ -93,17 +93,6 @@ impl CompactionRequest {
}
}

/// Builds compaction picker according to [CompactionOptions].
pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef {
match strategy {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}

/// Compaction scheduler tracks and manages compaction tasks.
pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
Expand Down Expand Up @@ -240,34 +229,13 @@ impl CompactionScheduler {
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = if let compact_request::Options::StrictWindow(window) = &options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
};

let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);

let pick_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick"])
.start_timer();
let Some(mut task) = picker.pick(request) else {
let Some(mut task) = self.build_compaction_task(request, options) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};

drop(pick_timer);

// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
Expand All @@ -290,6 +258,73 @@ impl CompactionScheduler {
// Notifies all pending tasks.
status.on_failure(err);
}

fn build_compaction_task(
&self,
req: CompactionRequest,
options: compact_request::Options,
) -> Option<Box<dyn CompactionTask>> {
let picker = new_picker(options, &req.current_version.options.compaction);
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
manifest_ctx,
version_control,
listener,
} = req;
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);

let compaction_region = CompactionRegion {
region_id,
region_options: current_version.options.clone(),
engine_config: engine_config.clone(),
region_metadata: current_version.metadata.clone(),
cache_manager: cache_manager.clone(),
access_layer: access_layer.clone(),
file_purger: file_purger.clone(),
manifest_ctx: manifest_ctx.clone(),
version_control: version_control.clone(),
};

let picker_output = {
let _pick_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick"])
.start_timer();
picker.pick(&compaction_region)
};

let picker_output = if let Some(picker_output) = picker_output {
picker_output
} else {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(0));
}
return None;
};

let task = CompactionTaskImpl {
request_sender,
waiters,
start_time,
listener,
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
};

Some(Box::new(task))
}
}

impl Drop for CompactionScheduler {
Expand Down Expand Up @@ -409,8 +444,8 @@ impl CompactionStatus {
}
}

#[derive(Debug)]
pub(crate) struct CompactionOutput {
#[derive(Debug, Clone)]
pub struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
Expand Down
Loading