Skip to content

Commit

Permalink
fix(mito): compaction scheduler schedules more tasks than expected (#…
Browse files Browse the repository at this point in the history
…2466)

* test: test on_compaction_finished

* fix: avoid submit same region to compact

* feat: persist and recover compaction time window

* test: fix test

* test: sort like result
  • Loading branch information
evenyag authored Sep 22, 2023
1 parent c9f8b9c commit c6e95ff
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 24 deletions.
150 changes: 131 additions & 19 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef;
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) compaction_time_window: Option<i64>,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Waiters of the compaction request.
Expand Down Expand Up @@ -101,24 +100,21 @@ impl CompactionScheduler {
file_purger: &FilePurgerRef,
waiter: OptionOutputTx,
) -> Result<()> {
let status = self.region_status.entry(region_id).or_insert_with(|| {
CompactionStatus::new(
region_id,
version_control.clone(),
access_layer.clone(),
file_purger.clone(),
)
});
if status.compacting {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
return Ok(());
}

// The region can compact directly.
let mut status = CompactionStatus::new(
region_id,
version_control.clone(),
access_layer.clone(),
file_purger.clone(),
);
let request = status.new_compaction_request(self.request_sender.clone(), waiter);
// Mark the region as compacting.
status.compacting = true;
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
}

Expand All @@ -127,7 +123,6 @@ impl CompactionScheduler {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
};
status.compacting = false;
// We should always try to compact the region until picker returns None.
let request =
status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none());
Expand Down Expand Up @@ -252,8 +247,6 @@ struct CompactionStatus {
access_layer: AccessLayerRef,
/// File purger of the region.
file_purger: FilePurgerRef,
/// Whether a compaction task is running.
compacting: bool,
/// Compaction pending to schedule.
///
/// For simplicity, we merge all pending compaction requests into one.
Expand All @@ -273,7 +266,6 @@ impl CompactionStatus {
version_control,
access_layer,
file_purger,
compacting: false,
pending_compaction: None,
}
}
Expand Down Expand Up @@ -306,8 +298,6 @@ impl CompactionStatus {
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
// TODO(hl): get persisted region compaction time window
compaction_time_window: None,
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
Expand All @@ -324,12 +314,15 @@ impl CompactionStatus {

#[cfg(test)]
mod tests {
use std::sync::Mutex;

use common_query::Output;
use tokio::sync::oneshot;

use super::*;
use crate::schedule::scheduler::{Job, Scheduler};
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
use crate::test_util::version_util::{apply_edit, VersionControlBuilder};

#[tokio::test]
async fn test_schedule_empty() {
Expand Down Expand Up @@ -373,4 +366,123 @@ mod tests {
assert!(matches!(output, Output::AffectedRows(0)));
assert!(scheduler.region_status.is_empty());
}

#[derive(Default)]
struct VecScheduler {
jobs: Mutex<Vec<Job>>,
}

impl VecScheduler {
fn num_jobs(&self) -> usize {
self.jobs.lock().unwrap().len()
}
}

#[async_trait::async_trait]
impl Scheduler for VecScheduler {
fn schedule(&self, job: Job) -> Result<()> {
self.jobs.lock().unwrap().push(job);
Ok(())
}

async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}

#[tokio::test]
async fn test_schedule_on_finished() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();

// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
scheduler
.schedule_compaction(
region_id,
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
)
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
let data = version_control.current();
let file_metas: Vec<_> = data.version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta())
.collect();

// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);
// The task is pending.
scheduler
.schedule_compaction(
region_id,
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
)
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_compaction
.is_some());

// On compaction finished and schedule next compaction.
scheduler.on_compaction_finished(region_id);
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
// The task is pending.
scheduler
.schedule_compaction(
region_id,
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
)
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_compaction
.is_some());
}
}
9 changes: 7 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl Picker for TwcsPicker {
let CompactionRequest {
current_version,
access_layer,
compaction_time_window,
request_sender,
waiters,
file_purger,
Expand All @@ -138,6 +137,9 @@ impl Picker for TwcsPicker {
expired_ssts.iter().for_each(|f| f.set_compacting(true));
}

let compaction_time_window = current_version
.compaction_time_window
.map(|window| window.as_secs() as i64);
let time_window_size = compaction_time_window
.or(self.time_window_seconds)
.unwrap_or_else(|| {
Expand Down Expand Up @@ -169,7 +171,7 @@ impl Picker for TwcsPicker {
outputs,
expired_ssts,
sst_write_buffer_size: ReadableSize::mb(4),
compaction_time_window: None,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
file_purger,
Expand Down Expand Up @@ -357,6 +359,9 @@ impl CompactionTask for TwcsCompactionTask {
compacted_files: deleted,
senders: std::mem::take(&mut self.waiters),
file_purger: self.file_purger.clone(),
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
})
}
Err(e) => {
Expand Down
13 changes: 12 additions & 1 deletion src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
use std::collections::HashMap;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -49,7 +50,8 @@ pub struct RegionChange {
pub struct RegionEdit {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
pub flushed_entry_id: Option<EntryId>,
pub flushed_sequence: Option<SequenceNumber>,
}
Expand Down Expand Up @@ -84,6 +86,9 @@ pub struct RegionManifest {
pub manifest_version: ManifestVersion,
/// Last WAL entry id of truncated data.
pub truncated_entry_id: Option<EntryId>,
/// Inferred compaction time window.
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
}

#[derive(Debug, Default)]
Expand All @@ -94,6 +99,7 @@ pub struct RegionManifestBuilder {
flushed_sequence: SequenceNumber,
manifest_version: ManifestVersion,
truncated_entry_id: Option<EntryId>,
compaction_time_window: Option<Duration>,
}

impl RegionManifestBuilder {
Expand All @@ -107,6 +113,7 @@ impl RegionManifestBuilder {
manifest_version: s.manifest_version,
flushed_sequence: s.flushed_sequence,
truncated_entry_id: s.truncated_entry_id,
compaction_time_window: s.compaction_time_window,
}
} else {
Default::default()
Expand All @@ -132,6 +139,9 @@ impl RegionManifestBuilder {
if let Some(flushed_sequence) = edit.flushed_sequence {
self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
}
if let Some(window) = edit.compaction_time_window {
self.compaction_time_window = Some(window);
}
}

pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
Expand All @@ -156,6 +166,7 @@ impl RegionManifestBuilder {
flushed_sequence: self.flushed_sequence,
manifest_version: self.manifest_version,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":846,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);

// reopen the manager
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl RegionOpener {
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(options)
.build();
let flushed_entry_id = version.flushed_entry_id;
Expand Down
Loading

0 comments on commit c6e95ff

Please sign in to comment.