diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index cdc2cc5d4b72..c0098319e2aa 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,7 +24,6 @@ use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use snafu::ResultExt; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -172,7 +171,6 @@ impl Picker for TwcsPicker { request_sender, sender: waiter, file_purger, - last_truncate_manifest_version: current_version.last_truncate_manifest_version, }; Some(Box::new(task)) } @@ -231,7 +229,6 @@ pub(crate) struct TwcsCompactionTask { pub(crate) request_sender: mpsc::Sender, /// Sender that are used to notify waiters waiting for pending compaction tasks. pub sender: Option>>, - pub last_truncate_manifest_version: Option, } impl Debug for TwcsCompactionTask { @@ -358,7 +355,6 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, sender: self.sender.take(), file_purger: self.file_purger.clone(), - last_truncate_manifest_version: self.last_truncate_manifest_version, }) } Err(e) => { diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 2aca44b73e60..6f3e6c9bb267 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -28,7 +28,7 @@ use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; -use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest}; +use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; use crate::sst::file::{FileId, FileMeta, FileTimeRange}; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; @@ -103,7 +103,7 @@ async fn test_engine_put_data_after_truncate() { }; put_rows(&engine, region_id, rows).await; - // Scan the region.mut + // Scan the region let request = ScanRequest::default(); let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -316,7 +316,6 @@ async fn test_engine_truncate_during_flush() { let current_version = region.version_control.current().version; assert_eq!(current_version.truncate_entry_id, None); - assert_eq!(current_version.last_truncate_manifest_version, None); // Truncate the region. engine @@ -329,6 +328,7 @@ async fn test_engine_truncate_during_flush() { region_id, file_metas: vec![file_meta.clone()], flushed_entry_id, + flushed_sequence: flushed_entry_id, memtables_to_remove: SmallVec::new(), file_purger: region.file_purger.clone(), senders: vec![sender], @@ -365,77 +365,4 @@ async fn test_engine_truncate_during_flush() { let current_version = region.version_control.current().version; assert_eq!(current_version.truncate_entry_id, None); - assert_eq!(current_version.last_truncate_manifest_version, Some(0)); -} - -#[tokio::test] -async fn test_engine_truncate_during_compaction() { - let mut env = TestEnv::with_prefix("truncate-during-compaction"); - let engine = env.create_engine(MitoConfig::default()).await; - - // Create the region. - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let region_dir = request.region_dir.clone(); - - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - let region = engine.get_region(region_id).unwrap(); - - // Create a parquet file. - // Simulate that the `handle_compaction()` function is currently being executed. - let file_id = FileId::random(); - let file_name = format!("{}.parquet", file_id); - let file_meta = FileMeta { - region_id, - file_id, - time_range: FileTimeRange::default(), - level: 0, - file_size: 0, - }; - env.get_object_store() - .unwrap() - .write(&join_path(®ion_dir, &file_name), vec![]) - .await - .unwrap(); - - let (sender, receiver) = oneshot::channel(); - - // Truncate the region. - engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) - .await - .unwrap(); - - // The compaction task is finished, and the `handle_compaction_finished()` is executed. - let finished = CompactionFinished { - region_id, - compaction_outputs: vec![file_meta], - compacted_files: vec![], - sender: Some(sender), - file_purger: region.file_purger.clone(), - last_truncate_manifest_version: None, - }; - - let worker_request = WorkerRequest::Background { - region_id, - notify: BackgroundNotify::CompactionFinished(finished), - }; - - engine - .handle_worker_request(region_id, worker_request) - .await - .unwrap(); - - let _ = receiver.await; - - let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request.clone()).unwrap(); - assert_eq!(0, scanner.num_files()); - - let current_version = region.version_control.current().version; - assert_eq!(current_version.last_truncate_manifest_version, Some(0)); } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 78b2b71954ea..e9c85eea3958 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -149,11 +149,6 @@ impl RegionManifestManager { inner.manifest.clone() } - pub async fn manifest_version(&self) -> ManifestVersion { - let inner = self.inner.read().await; - inner.last_version - } - #[cfg(test)] pub async fn store(&self) -> ManifestObjectStore { let inner = self.inner.read().await; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 5ead6ab9aabd..60df423e1288 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -25,7 +25,6 @@ use std::sync::{Arc, RwLock}; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -141,12 +140,7 @@ impl VersionControl { } /// Truncate current version. - pub(crate) fn truncate( - &self, - flushed_entry_id: u64, - manifest_version: ManifestVersion, - memtable_builder: &MemtableBuilderRef, - ) { + pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); @@ -154,7 +148,6 @@ impl VersionControl { VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(flushed_entry_id) .truncate_entry_id(Some(flushed_entry_id)) - .last_truncate_manifest_version(Some(manifest_version)) .build(), ); @@ -205,10 +198,6 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncation table. pub(crate) truncate_entry_id: Option, - /// Last truncate table `ManifestVersion` - /// - /// Used to check if it is a compaction task during the truncation table. - pub(crate) last_truncate_manifest_version: Option, // TODO(yingwen): RegionOptions. } @@ -222,7 +211,6 @@ pub(crate) struct VersionBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, truncate_entry_id: Option, - last_truncate_manifest_version: Option, } impl VersionBuilder { @@ -235,7 +223,6 @@ impl VersionBuilder { flushed_entry_id: 0, flushed_sequence: 0, truncate_entry_id: None, - last_truncate_manifest_version: None, } } @@ -248,7 +235,6 @@ impl VersionBuilder { flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, truncate_entry_id: None, - last_truncate_manifest_version: version.last_truncate_manifest_version, } } @@ -282,15 +268,6 @@ impl VersionBuilder { self } - /// Sets last truncate manifest version. - pub(crate) fn last_truncate_manifest_version( - mut self, - manifest_version: Option, - ) -> Self { - self.last_truncate_manifest_version = manifest_version; - self - } - /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -341,7 +318,6 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncate_entry_id: self.truncate_entry_id, - last_truncate_manifest_version: self.last_truncate_manifest_version, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 139e485cfe06..9d5da6643e8a 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,7 +32,6 @@ use datatypes::prelude::DataType; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::manifest::ManifestVersion; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, @@ -588,8 +587,6 @@ pub(crate) struct CompactionFinished { pub(crate) sender: Option>>, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, - /// Last truncate `ManifestVersion` before requesting compaction task. - pub(crate) last_truncate_manifest_version: Option, } impl CompactionFinished { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 3abad8891ea3..6e55d7f22860 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -15,7 +15,6 @@ //! Utilities for testing. use std::collections::HashMap; -use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -196,10 +195,6 @@ impl TestEnv { RegionManifestManager::open(manifest_opts).await } } - - pub fn get_data_path(&self) -> PathBuf { - self.data_home.path().join("data") - } } /// Builder to mock a [RegionCreateRequest]. diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 6886e1594579..9cecccb41425 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -19,7 +19,7 @@ use store_api::storage::RegionId; use tokio::sync::oneshot; use crate::compaction::CompactionRequest; -use crate::error::{RegionNotFoundSnafu, RegionTruncatingSnafu, Result}; +use crate::error::{RegionNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished}; @@ -59,16 +59,6 @@ impl RegionWorkerLoop { return; }; - let version_data = region.version_control.current(); - if version_data - .version - .last_truncate_manifest_version - .ne(&request.last_truncate_manifest_version) - { - request.on_failure(RegionTruncatingSnafu { region_id }.build()); - return; - } - // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index e16bb28b19a1..455ac59dbe85 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -37,7 +37,6 @@ impl RegionWorkerLoop { self.flush_scheduler.on_region_truncating(region_id); // TODO(DevilExileSu): Notifies compaction scheduler. - let last_manifest_version = region.manifest_manager.manifest_version().await; // Write region truncated to manifest. let truncate = RegionTruncate { @@ -51,7 +50,7 @@ impl RegionWorkerLoop { // Reset region's version and mark all SSTs deleted. region .version_control - .truncate(entry_id, last_manifest_version, &self.memtable_builder); + .truncate(entry_id, &self.memtable_builder); // Make all data obsolete. self.wal.obsolete(region_id, entry_id).await?;