From 3ace46e411c6d162cf1c40fad311703d93372085 Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 16:04:45 +0800 Subject: [PATCH] feat: check flush and compaction tasks --- src/mito2/src/compaction/twcs.rs | 4 + src/mito2/src/engine/truncate_test.rs | 99 ++++++++++++++++++++++- src/mito2/src/manifest/action.rs | 5 -- src/mito2/src/manifest/manager.rs | 5 ++ src/mito2/src/region/version.rs | 43 +++++++++- src/mito2/src/request.rs | 3 + src/mito2/src/worker/handle_compaction.rs | 12 ++- src/mito2/src/worker/handle_flush.rs | 12 ++- src/mito2/src/worker/handle_truncate.rs | 5 +- 9 files changed, 168 insertions(+), 20 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index c0098319e2aa..cdc2cc5d4b72 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,6 +24,7 @@ use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use snafu::ResultExt; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -171,6 +172,7 @@ impl Picker for TwcsPicker { request_sender, sender: waiter, file_purger, + last_truncate_manifest_version: current_version.last_truncate_manifest_version, }; Some(Box::new(task)) } @@ -229,6 +231,7 @@ pub(crate) struct TwcsCompactionTask { pub(crate) request_sender: mpsc::Sender, /// Sender that are used to notify waiters waiting for pending compaction tasks. pub sender: Option>>, + pub last_truncate_manifest_version: Option, } impl Debug for TwcsCompactionTask { @@ -355,6 +358,7 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, sender: self.sender.take(), file_purger: self.file_purger.clone(), + last_truncate_manifest_version: self.last_truncate_manifest_version, }) } Err(e) => { diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 0632fe7f235b..2aca44b73e60 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -28,7 +28,7 @@ use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; -use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; +use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest}; use crate::sst::file::{FileId, FileMeta, FileTimeRange}; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; @@ -270,7 +270,7 @@ async fn test_engine_truncate_reopen() { #[tokio::test] async fn test_engine_truncate_during_flush() { - let mut env = TestEnv::with_prefix("truncate-reopen"); + let mut env = TestEnv::with_prefix("truncate-during-flush"); let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -314,6 +314,10 @@ async fn test_engine_truncate_during_flush() { let flushed_entry_id = region.version_control.current().last_entry_id; + let current_version = region.version_control.current().version; + assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.last_truncate_manifest_version, None); + // Truncate the region. engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) @@ -340,9 +344,98 @@ async fn test_engine_truncate_during_flush() { .await .unwrap(); - let _ = receiver.await.unwrap(); + let _ = receiver.await; let request = ScanRequest::default(); let scanner = engine.scan(region_id, request.clone()).unwrap(); assert_eq!(0, scanner.num_files()); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + let current_version = region.version_control.current().version; + assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.last_truncate_manifest_version, Some(0)); +} + +#[tokio::test] +async fn test_engine_truncate_during_compaction() { + let mut env = TestEnv::with_prefix("truncate-during-compaction"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + + // Create a parquet file. + // Simulate that the `handle_compaction()` function is currently being executed. + let file_id = FileId::random(); + let file_name = format!("{}.parquet", file_id); + let file_meta = FileMeta { + region_id, + file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 0, + }; + env.get_object_store() + .unwrap() + .write(&join_path(®ion_dir, &file_name), vec![]) + .await + .unwrap(); + + let (sender, receiver) = oneshot::channel(); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // The compaction task is finished, and the `handle_compaction_finished()` is executed. + let finished = CompactionFinished { + region_id, + compaction_outputs: vec![file_meta], + compacted_files: vec![], + sender: Some(sender), + file_purger: region.file_purger.clone(), + last_truncate_manifest_version: None, + }; + + let worker_request = WorkerRequest::Background { + region_id, + notify: BackgroundNotify::CompactionFinished(finished), + }; + + engine + .handle_worker_request(region_id, worker_request) + .await + .unwrap(); + + let _ = receiver.await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request.clone()).unwrap(); + assert_eq!(0, scanner.num_files()); + + let current_version = region.version_control.current().version; + assert_eq!(current_version.last_truncate_manifest_version, Some(0)); } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 9df52e65b913..949898236c3b 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; -use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::manifest::ManifestVersion; @@ -130,10 +129,6 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; self.flushed_entry_id = truncate.flushed_entry_id; - info!( - "Truncate region {} to entry {}", - truncate.region_id, truncate.flushed_entry_id - ); self.files.clear(); } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e9c85eea3958..78b2b71954ea 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -149,6 +149,11 @@ impl RegionManifestManager { inner.manifest.clone() } + pub async fn manifest_version(&self) -> ManifestVersion { + let inner = self.inner.read().await; + inner.last_version + } + #[cfg(test)] pub async fn store(&self) -> ManifestObjectStore { let inner = self.inner.read().await; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 78b3eca6d832..5ead6ab9aabd 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -25,6 +25,7 @@ use std::sync::{Arc, RwLock}; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -139,14 +140,21 @@ impl VersionControl { version_data.version = new_version; } - /// Reset current version. - pub(crate) fn reset(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { + /// Truncate current version. + pub(crate) fn truncate( + &self, + flushed_entry_id: u64, + manifest_version: ManifestVersion, + memtable_builder: &MemtableBuilderRef, + ) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(flushed_entry_id) + .truncate_entry_id(Some(flushed_entry_id)) + .last_truncate_manifest_version(Some(manifest_version)) .build(), ); @@ -193,6 +201,14 @@ pub(crate) struct Version { pub(crate) flushed_entry_id: EntryId, /// Inclusive max sequence of flushed data. pub(crate) flushed_sequence: SequenceNumber, + /// Latest entry id during the truncating table. + /// + /// Used to check if it is a flush task during the truncation table. + pub(crate) truncate_entry_id: Option, + /// Last truncate table `ManifestVersion` + /// + /// Used to check if it is a compaction task during the truncation table. + pub(crate) last_truncate_manifest_version: Option, // TODO(yingwen): RegionOptions. } @@ -205,6 +221,8 @@ pub(crate) struct VersionBuilder { ssts: SstVersionRef, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, + truncate_entry_id: Option, + last_truncate_manifest_version: Option, } impl VersionBuilder { @@ -216,6 +234,8 @@ impl VersionBuilder { ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, flushed_sequence: 0, + truncate_entry_id: None, + last_truncate_manifest_version: None, } } @@ -227,6 +247,8 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, + truncate_entry_id: None, + last_truncate_manifest_version: version.last_truncate_manifest_version, } } @@ -254,6 +276,21 @@ impl VersionBuilder { self } + /// Sets truncated entty id. + pub(crate) fn truncate_entry_id(mut self, entry_id: Option) -> Self { + self.truncate_entry_id = entry_id; + self + } + + /// Sets last truncate manifest version. + pub(crate) fn last_truncate_manifest_version( + mut self, + manifest_version: Option, + ) -> Self { + self.last_truncate_manifest_version = manifest_version; + self + } + /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -303,6 +340,8 @@ impl VersionBuilder { ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, + truncate_entry_id: self.truncate_entry_id, + last_truncate_manifest_version: self.last_truncate_manifest_version, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 9d5da6643e8a..139e485cfe06 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,6 +32,7 @@ use datatypes::prelude::DataType; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::manifest::ManifestVersion; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, @@ -587,6 +588,8 @@ pub(crate) struct CompactionFinished { pub(crate) sender: Option>>, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, + /// Last truncate `ManifestVersion` before requesting compaction task. + pub(crate) last_truncate_manifest_version: Option, } impl CompactionFinished { diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 9cecccb41425..6886e1594579 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -19,7 +19,7 @@ use store_api::storage::RegionId; use tokio::sync::oneshot; use crate::compaction::CompactionRequest; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::{RegionNotFoundSnafu, RegionTruncatingSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished}; @@ -59,6 +59,16 @@ impl RegionWorkerLoop { return; }; + let version_data = region.version_control.current(); + if version_data + .version + .last_truncate_manifest_version + .ne(&request.last_truncate_manifest_version) + { + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } + // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index aec4dbb20cd7..5659f7156582 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -43,13 +43,11 @@ impl RegionWorkerLoop { // The flush task before truncating the region. let version_data = region.version_control.current(); - if version_data.version.flushed_entry_id >= request.flushed_entry_id { - info!( - "Truncate region {} to {}", - region_id, request.flushed_entry_id - ); - request.on_failure(RegionTruncatingSnafu { region_id }.build()); - return; + if let Some(truncate_entry_id) = version_data.version.truncate_entry_id { + if truncate_entry_id >= request.flushed_entry_id { + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } } // Write region edit to manifest. diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index cb4b18dcc091..e16bb28b19a1 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -36,7 +36,8 @@ impl RegionWorkerLoop { // Notifies flush scheduler. self.flush_scheduler.on_region_truncating(region_id); - // TODO(DevilExileSu): Consider compaction tasks during truncate. + // TODO(DevilExileSu): Notifies compaction scheduler. + let last_manifest_version = region.manifest_manager.manifest_version().await; // Write region truncated to manifest. let truncate = RegionTruncate { @@ -50,7 +51,7 @@ impl RegionWorkerLoop { // Reset region's version and mark all SSTs deleted. region .version_control - .reset(entry_id, &self.memtable_builder); + .truncate(entry_id, last_manifest_version, &self.memtable_builder); // Make all data obsolete. self.wal.obsolete(region_id, entry_id).await?;