diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 6f3e6c9bb267..f6b70a58b572 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -315,7 +315,7 @@ async fn test_engine_truncate_during_flush() { let flushed_entry_id = region.version_control.current().last_entry_id; let current_version = region.version_control.current().version; - assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.truncated_entry_id, None); // Truncate the region. engine @@ -364,5 +364,5 @@ async fn test_engine_truncate_during_flush() { .unwrap(); let current_version = region.version_control.current().version; - assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.truncated_entry_id, None); } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 29552fbfde81..89dfeccf0de6 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -63,6 +63,7 @@ pub struct RegionRemove { pub struct RegionTruncate { pub region_id: RegionId, pub truncated_entry_id: EntryId, + pub truncated_sequence: SequenceNumber, } /// The region manifest data. @@ -128,7 +129,8 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; - self.flushed_entry_id = truncate.flushed_entry_id; + self.flushed_entry_id = truncate.truncated_entry_id; + self.flushed_sequence = truncate.truncated_sequence; self.files.clear(); } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 60df423e1288..f9fc2ee83727 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -140,14 +140,20 @@ impl VersionControl { } /// Truncate current version. - pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { + pub(crate) fn truncate( + &self, + truncated_entry_id: EntryId, + truncated_sequence: SequenceNumber, + memtable_builder: &MemtableBuilderRef, + ) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) - .flushed_entry_id(flushed_entry_id) - .truncate_entry_id(Some(flushed_entry_id)) + .flushed_entry_id(truncated_entry_id) + .flushed_sequence(truncated_sequence) + .truncated_entry_id(Some(truncated_entry_id)) .build(), ); @@ -197,7 +203,7 @@ pub(crate) struct Version { /// Latest entry id during the truncating table. /// /// Used to check if it is a flush task during the truncation table. - pub(crate) truncate_entry_id: Option, + pub(crate) truncated_entry_id: Option, // TODO(yingwen): RegionOptions. } @@ -210,7 +216,7 @@ pub(crate) struct VersionBuilder { ssts: SstVersionRef, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, - truncate_entry_id: Option, + truncated_entry_id: Option, } impl VersionBuilder { @@ -222,7 +228,7 @@ impl VersionBuilder { ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, flushed_sequence: 0, - truncate_entry_id: None, + truncated_entry_id: None, } } @@ -234,7 +240,7 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, - truncate_entry_id: None, + truncated_entry_id: None, } } @@ -263,8 +269,8 @@ impl VersionBuilder { } /// Sets truncated entty id. - pub(crate) fn truncate_entry_id(mut self, entry_id: Option) -> Self { - self.truncate_entry_id = entry_id; + pub(crate) fn truncated_entry_id(mut self, entry_id: Option) -> Self { + self.truncated_entry_id = entry_id; self } @@ -317,7 +323,7 @@ impl VersionBuilder { ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, - truncate_entry_id: self.truncate_entry_id, + truncated_entry_id: self.truncated_entry_id, } } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 455ac59dbe85..42a418893846 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -31,29 +31,32 @@ impl RegionWorkerLoop { info!("Try to truncate region {}", region_id); let version_data = region.version_control.current(); - let entry_id = version_data.last_entry_id; - - // Notifies flush scheduler. - self.flush_scheduler.on_region_truncating(region_id); - - // TODO(DevilExileSu): Notifies compaction scheduler. + let truncated_entry_id = version_data.last_entry_id; + let truncated_sequence = version_data.committed_sequence; // Write region truncated to manifest. let truncate = RegionTruncate { region_id, - flushed_entry_id: entry_id, + truncated_entry_id, + truncated_sequence, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); region.manifest_manager.update(action_list).await?; + // Notifies flush scheduler. + self.flush_scheduler.on_region_truncating(region_id); + // TODO(DevilExileSu): Notifies compaction scheduler. + // Reset region's version and mark all SSTs deleted. - region - .version_control - .truncate(entry_id, &self.memtable_builder); + region.version_control.truncate( + truncated_entry_id, + truncated_sequence, + &self.memtable_builder, + ); // Make all data obsolete. - self.wal.obsolete(region_id, entry_id).await?; + self.wal.obsolete(region_id, truncated_entry_id).await?; info!("Done truncate"); Ok(Output::AffectedRows(0))