diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 23ce3517e0ed..38327f10ca81 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef; pub struct CompactionRequest { pub(crate) current_version: VersionRef, pub(crate) access_layer: AccessLayerRef, - pub(crate) compaction_time_window: Option, /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, /// Waiters of the compaction request. @@ -101,24 +100,21 @@ impl CompactionScheduler { file_purger: &FilePurgerRef, waiter: OptionOutputTx, ) -> Result<()> { - let status = self.region_status.entry(region_id).or_insert_with(|| { - CompactionStatus::new( - region_id, - version_control.clone(), - access_layer.clone(), - file_purger.clone(), - ) - }); - if status.compacting { + if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. status.merge_waiter(waiter); return Ok(()); } // The region can compact directly. + let mut status = CompactionStatus::new( + region_id, + version_control.clone(), + access_layer.clone(), + file_purger.clone(), + ); let request = status.new_compaction_request(self.request_sender.clone(), waiter); - // Mark the region as compacting. - status.compacting = true; + self.region_status.insert(region_id, status); self.schedule_compaction_request(request) } @@ -127,7 +123,6 @@ impl CompactionScheduler { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; - status.compacting = false; // We should always try to compact the region until picker returns None. let request = status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none()); @@ -252,8 +247,6 @@ struct CompactionStatus { access_layer: AccessLayerRef, /// File purger of the region. file_purger: FilePurgerRef, - /// Whether a compaction task is running. - compacting: bool, /// Compaction pending to schedule. /// /// For simplicity, we merge all pending compaction requests into one. @@ -273,7 +266,6 @@ impl CompactionStatus { version_control, access_layer, file_purger, - compacting: false, pending_compaction: None, } } @@ -306,8 +298,6 @@ impl CompactionStatus { let mut req = CompactionRequest { current_version, access_layer: self.access_layer.clone(), - // TODO(hl): get persisted region compaction time window - compaction_time_window: None, request_sender: request_sender.clone(), waiters: Vec::new(), file_purger: self.file_purger.clone(), @@ -324,12 +314,15 @@ impl CompactionStatus { #[cfg(test)] mod tests { + use std::sync::Mutex; + use common_query::Output; use tokio::sync::oneshot; use super::*; + use crate::schedule::scheduler::{Job, Scheduler}; use crate::test_util::scheduler_util::SchedulerEnv; - use crate::test_util::version_util::VersionControlBuilder; + use crate::test_util::version_util::{apply_edit, VersionControlBuilder}; #[tokio::test] async fn test_schedule_empty() { @@ -373,4 +366,123 @@ mod tests { assert!(matches!(output, Output::AffectedRows(0))); assert!(scheduler.region_status.is_empty()); } + + #[derive(Default)] + struct VecScheduler { + jobs: Mutex>, + } + + impl VecScheduler { + fn num_jobs(&self) -> usize { + self.jobs.lock().unwrap().len() + } + } + + #[async_trait::async_trait] + impl Scheduler for VecScheduler { + fn schedule(&self, job: Job) -> Result<()> { + self.jobs.lock().unwrap().push(job); + Ok(()) + } + + async fn stop(&self, _await_termination: bool) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_schedule_on_finished() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + let region_id = builder.region_id(); + + // 5 files to compact. + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + // Should schedule 1 compaction. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + let data = version_control.current(); + let file_metas: Vec<_> = data.version.ssts.levels()[0] + .files + .values() + .map(|file| file.meta()) + .collect(); + + // 5 files for next compaction and removes old files. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &file_metas, + purger.clone(), + ); + // The task is pending. + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_compaction + .is_some()); + + // On compaction finished and schedule next compaction. + scheduler.on_compaction_finished(region_id); + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(2, job_scheduler.num_jobs()); + // 5 files for next compaction. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &[], + purger.clone(), + ); + // The task is pending. + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + assert_eq!(2, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_compaction + .is_some()); + } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 5f03a3aa5fdb..f840d6aeb2f9 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -120,7 +120,6 @@ impl Picker for TwcsPicker { let CompactionRequest { current_version, access_layer, - compaction_time_window, request_sender, waiters, file_purger, @@ -138,6 +137,9 @@ impl Picker for TwcsPicker { expired_ssts.iter().for_each(|f| f.set_compacting(true)); } + let compaction_time_window = current_version + .compaction_time_window + .map(|window| window.as_secs() as i64); let time_window_size = compaction_time_window .or(self.time_window_seconds) .unwrap_or_else(|| { @@ -169,7 +171,7 @@ impl Picker for TwcsPicker { outputs, expired_ssts, sst_write_buffer_size: ReadableSize::mb(4), - compaction_time_window: None, + compaction_time_window: Some(time_window_size), request_sender, waiters, file_purger, @@ -357,6 +359,9 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, senders: std::mem::take(&mut self.waiters), file_purger: self.file_purger.clone(), + compaction_time_window: self + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), }) } Err(e) => { diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index a39a89cc739d..aa490fad80fa 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -15,6 +15,7 @@ //! Defines [RegionMetaAction] related structs and [RegionCheckpoint]. use std::collections::HashMap; +use std::time::Duration; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -49,7 +50,8 @@ pub struct RegionChange { pub struct RegionEdit { pub files_to_add: Vec, pub files_to_remove: Vec, - pub compaction_time_window: Option, + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, pub flushed_entry_id: Option, pub flushed_sequence: Option, } @@ -84,6 +86,9 @@ pub struct RegionManifest { pub manifest_version: ManifestVersion, /// Last WAL entry id of truncated data. pub truncated_entry_id: Option, + /// Inferred compaction time window. + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, } #[derive(Debug, Default)] @@ -94,6 +99,7 @@ pub struct RegionManifestBuilder { flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, truncated_entry_id: Option, + compaction_time_window: Option, } impl RegionManifestBuilder { @@ -107,6 +113,7 @@ impl RegionManifestBuilder { manifest_version: s.manifest_version, flushed_sequence: s.flushed_sequence, truncated_entry_id: s.truncated_entry_id, + compaction_time_window: s.compaction_time_window, } } else { Default::default() @@ -132,6 +139,9 @@ impl RegionManifestBuilder { if let Some(flushed_sequence) = edit.flushed_sequence { self.flushed_sequence = self.flushed_sequence.max(flushed_sequence); } + if let Some(window) = edit.compaction_time_window { + self.compaction_time_window = Some(window); + } } pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { @@ -156,6 +166,7 @@ impl RegionManifestBuilder { flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, truncated_entry_id: self.truncated_entry_id, + compaction_time_window: self.compaction_time_window, }) } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index e1b1c54cd82c..68c7063e1e63 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":846,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index fb6e5f89896b..211cecb75047 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -218,6 +218,7 @@ impl RegionOpener { .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) .options(options) .build(); let flushed_entry_id = version.flushed_entry_id; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index e88fafedaf08..c7d84fd913df 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -24,6 +24,7 @@ //! and became invisible between step 1 and 2, so need to acquire version at first. use std::sync::{Arc, RwLock}; +use std::time::Duration; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -205,6 +206,8 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncating table. pub(crate) truncated_entry_id: Option, + /// Inferred compaction time window. + pub(crate) compaction_time_window: Option, /// Options of the region. pub(crate) options: RegionOptions, } @@ -219,6 +222,7 @@ pub(crate) struct VersionBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, truncated_entry_id: Option, + compaction_time_window: Option, options: RegionOptions, } @@ -232,6 +236,7 @@ impl VersionBuilder { flushed_entry_id: 0, flushed_sequence: 0, truncated_entry_id: None, + compaction_time_window: None, options: RegionOptions::default(), } } @@ -245,6 +250,7 @@ impl VersionBuilder { flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, truncated_entry_id: version.truncated_entry_id, + compaction_time_window: version.compaction_time_window, options: version.options.clone(), } } @@ -279,6 +285,12 @@ impl VersionBuilder { self } + /// Sets compaction time window. + pub(crate) fn compaction_time_window(mut self, window: Option) -> Self { + self.compaction_time_window = window; + self + } + /// Sets options. pub(crate) fn options(mut self, options: RegionOptions) -> Self { self.options = options; @@ -293,6 +305,9 @@ impl VersionBuilder { if let Some(sequence) = edit.flushed_sequence { self.flushed_sequence = self.flushed_sequence.max(sequence); } + if let Some(window) = edit.compaction_time_window { + self.compaction_time_window = Some(window); + } if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() { let mut ssts = (*self.ssts).clone(); ssts.add_files(file_purger, edit.files_to_add.into_iter()); @@ -335,6 +350,7 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncated_entry_id: self.truncated_entry_id, + compaction_time_window: self.compaction_time_window, options: self.options, } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 1fbcbc962338..33cc121a5a3e 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, @@ -640,6 +641,8 @@ pub(crate) struct CompactionFinished { pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, + /// Inferred Compaction time window. + pub(crate) compaction_time_window: Option, } impl CompactionFinished { diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 5e1e259e75d9..611371a7ec0b 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -52,6 +52,12 @@ impl SchedulerEnv { } } + /// Set scheduler. + pub(crate) fn scheduler(mut self, scheduler: SchedulerRef) -> Self { + self.scheduler = Some(scheduler); + self + } + /// Creates a new compaction scheduler. pub(crate) fn mock_compaction_scheduler( &self, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 2fee8a987b06..e480b1f146df 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; +use crate::manifest::action::RegionEdit; use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; @@ -113,3 +114,41 @@ impl VersionControlBuilder { VersionControl::new(version) } } + +/// Add mocked l0 files to the version control. +/// `files_to_add` are slice of `(start_ms, end_ms)`. +pub(crate) fn apply_edit( + version_control: &VersionControl, + files_to_add: &[(i64, i64)], + files_to_remove: &[FileMeta], + purger: FilePurgerRef, +) { + let region_id = version_control.current().version.metadata.region_id; + let files_to_add = files_to_add + .iter() + .map(|(start_ms, end_ms)| { + FileMeta { + region_id, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(*start_ms), + Timestamp::new_millisecond(*end_ms), + ), + level: 0, + file_size: 0, // We don't care file size. + } + }) + .collect(); + + version_control.apply_edit( + RegionEdit { + files_to_add, + files_to_remove: files_to_remove.to_vec(), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + &[], + purger, + ); +} diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 0e0f3a07ef34..79ade0f4dc9f 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -61,7 +61,7 @@ impl RegionWorkerLoop { let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), files_to_remove: std::mem::take(&mut request.compacted_files), - compaction_time_window: None, // TODO(hl): update window maybe + compaction_time_window: request.compaction_time_window, flushed_entry_id: None, flushed_sequence: None, }; diff --git a/tests/cases/standalone/common/select/like.result b/tests/cases/standalone/common/select/like.result index 91b648e1a83f..b47528b80e23 100644 --- a/tests/cases/standalone/common/select/like.result +++ b/tests/cases/standalone/common/select/like.result @@ -14,6 +14,7 @@ INSERT INTO TABLE host VALUES Affected Rows: 4 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM host WHERE host LIKE '%+%'; +-------------------------+------+-----+ diff --git a/tests/cases/standalone/common/select/like.sql b/tests/cases/standalone/common/select/like.sql index b4ef76bb93b7..281cef769a36 100644 --- a/tests/cases/standalone/common/select/like.sql +++ b/tests/cases/standalone/common/select/like.sql @@ -10,6 +10,7 @@ INSERT INTO TABLE host VALUES (2, 'a', 3.0), (3, 'c', 4.0); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM host WHERE host LIKE '%+%'; DROP TABLE host;