Skip to content

Commit

Permalink
feat: check flush and compaction tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 12, 2023
1 parent 4cd7aaa commit 3ace46e
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 20 deletions.
4 changes: 4 additions & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use snafu::ResultExt;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -171,6 +172,7 @@ impl Picker for TwcsPicker {
request_sender,
sender: waiter,
file_purger,
last_truncate_manifest_version: current_version.last_truncate_manifest_version,
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -229,6 +231,7 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: Option<Sender<error::Result<Output>>>,
pub last_truncate_manifest_version: Option<ManifestVersion>,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -355,6 +358,7 @@ impl CompactionTask for TwcsCompactionTask {
compacted_files: deleted,
sender: self.sender.take(),
file_purger: self.file_purger.clone(),
last_truncate_manifest_version: self.last_truncate_manifest_version,
})
}
Err(e) => {
Expand Down
99 changes: 96 additions & 3 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::oneshot;

use super::ScanRequest;
use crate::config::MitoConfig;
use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest};
use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest};
use crate::sst::file::{FileId, FileMeta, FileTimeRange};
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

Expand Down Expand Up @@ -270,7 +270,7 @@ async fn test_engine_truncate_reopen() {

#[tokio::test]
async fn test_engine_truncate_during_flush() {
let mut env = TestEnv::with_prefix("truncate-reopen");
let mut env = TestEnv::with_prefix("truncate-during-flush");
let engine = env.create_engine(MitoConfig::default()).await;

// Create the region.
Expand Down Expand Up @@ -314,6 +314,10 @@ async fn test_engine_truncate_during_flush() {

let flushed_entry_id = region.version_control.current().last_entry_id;

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.last_truncate_manifest_version, None);

// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
Expand All @@ -340,9 +344,98 @@ async fn test_engine_truncate_during_flush() {
.await
.unwrap();

let _ = receiver.await.unwrap();
let _ = receiver.await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());

// Put data to the region.
let rows = Rows {
schema: column_schemas,
rows: build_rows(5, 8),
};
put_rows(&engine, region_id, rows).await;

// Flush the region.
engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap();

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.last_truncate_manifest_version, Some(0));
}

#[tokio::test]
async fn test_engine_truncate_during_compaction() {
let mut env = TestEnv::with_prefix("truncate-during-compaction");
let engine = env.create_engine(MitoConfig::default()).await;

// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();

// Create a parquet file.
// Simulate that the `handle_compaction()` function is currently being executed.
let file_id = FileId::random();
let file_name = format!("{}.parquet", file_id);
let file_meta = FileMeta {
region_id,
file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
};
env.get_object_store()
.unwrap()
.write(&join_path(&region_dir, &file_name), vec![])
.await
.unwrap();

let (sender, receiver) = oneshot::channel();

// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();

// The compaction task is finished, and the `handle_compaction_finished()` is executed.
let finished = CompactionFinished {
region_id,
compaction_outputs: vec![file_meta],
compacted_files: vec![],
sender: Some(sender),
file_purger: region.file_purger.clone(),
last_truncate_manifest_version: None,
};

let worker_request = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::CompactionFinished(finished),
};

engine
.handle_worker_request(region_id, worker_request)
.await
.unwrap();

let _ = receiver.await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());

let current_version = region.version_control.current().version;
assert_eq!(current_version.last_truncate_manifest_version, Some(0));
}
5 changes: 0 additions & 5 deletions src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use std::collections::HashMap;

use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::manifest::ManifestVersion;
Expand Down Expand Up @@ -130,10 +129,6 @@ impl RegionManifestBuilder {
pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
self.manifest_version = manifest_version;
self.flushed_entry_id = truncate.flushed_entry_id;
info!(
"Truncate region {} to entry {}",
truncate.region_id, truncate.flushed_entry_id
);
self.files.clear();
}

Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ impl RegionManifestManager {
inner.manifest.clone()
}

pub async fn manifest_version(&self) -> ManifestVersion {
let inner = self.inner.read().await;
inner.last_version
}

#[cfg(test)]
pub async fn store(&self) -> ManifestObjectStore {
let inner = self.inner.read().await;
Expand Down
43 changes: 41 additions & 2 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use std::sync::{Arc, RwLock};

use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;

Expand Down Expand Up @@ -139,14 +140,21 @@ impl VersionControl {
version_data.version = new_version;
}

/// Reset current version.
pub(crate) fn reset(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) {
/// Truncate current version.
pub(crate) fn truncate(
&self,
flushed_entry_id: u64,
manifest_version: ManifestVersion,
memtable_builder: &MemtableBuilderRef,
) {
let version = self.current().version;

let new_mutable = memtable_builder.build(&version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(flushed_entry_id)
.truncate_entry_id(Some(flushed_entry_id))
.last_truncate_manifest_version(Some(manifest_version))
.build(),
);

Expand Down Expand Up @@ -193,6 +201,14 @@ pub(crate) struct Version {
pub(crate) flushed_entry_id: EntryId,
/// Inclusive max sequence of flushed data.
pub(crate) flushed_sequence: SequenceNumber,
/// Latest entry id during the truncating table.
///
/// Used to check if it is a flush task during the truncation table.
pub(crate) truncate_entry_id: Option<EntryId>,
/// Last truncate table `ManifestVersion`
///
/// Used to check if it is a compaction task during the truncation table.
pub(crate) last_truncate_manifest_version: Option<ManifestVersion>,
// TODO(yingwen): RegionOptions.
}

Expand All @@ -205,6 +221,8 @@ pub(crate) struct VersionBuilder {
ssts: SstVersionRef,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncate_entry_id: Option<EntryId>,
last_truncate_manifest_version: Option<ManifestVersion>,
}

impl VersionBuilder {
Expand All @@ -216,6 +234,8 @@ impl VersionBuilder {
ssts: Arc::new(SstVersion::new()),
flushed_entry_id: 0,
flushed_sequence: 0,
truncate_entry_id: None,
last_truncate_manifest_version: None,
}
}

Expand All @@ -227,6 +247,8 @@ impl VersionBuilder {
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncate_entry_id: None,
last_truncate_manifest_version: version.last_truncate_manifest_version,
}
}

Expand Down Expand Up @@ -254,6 +276,21 @@ impl VersionBuilder {
self
}

/// Sets truncated entty id.
pub(crate) fn truncate_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncate_entry_id = entry_id;
self
}

/// Sets last truncate manifest version.
pub(crate) fn last_truncate_manifest_version(
mut self,
manifest_version: Option<ManifestVersion>,
) -> Self {
self.last_truncate_manifest_version = manifest_version;
self
}

/// Apply edit to the builder.
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(entry_id) = edit.flushed_entry_id {
Expand Down Expand Up @@ -303,6 +340,8 @@ impl VersionBuilder {
ssts: self.ssts,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncate_entry_id: self.truncate_entry_id,
last_truncate_manifest_version: self.last_truncate_manifest_version,
}
}
}
3 changes: 3 additions & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datatypes::prelude::DataType;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
Expand Down Expand Up @@ -587,6 +588,8 @@ pub(crate) struct CompactionFinished {
pub(crate) sender: Option<oneshot::Sender<Result<Output>>>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
/// Last truncate `ManifestVersion` before requesting compaction task.
pub(crate) last_truncate_manifest_version: Option<ManifestVersion>,
}

impl CompactionFinished {
Expand Down
12 changes: 11 additions & 1 deletion src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use store_api::storage::RegionId;
use tokio::sync::oneshot;

use crate::compaction::CompactionRequest;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::error::{RegionNotFoundSnafu, RegionTruncatingSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished};
Expand Down Expand Up @@ -59,6 +59,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};

let version_data = region.version_control.current();
if version_data
.version
.last_truncate_manifest_version
.ne(&request.last_truncate_manifest_version)
{
request.on_failure(RegionTruncatingSnafu { region_id }.build());
return;
}

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
Expand Down
12 changes: 5 additions & 7 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {

// The flush task before truncating the region.
let version_data = region.version_control.current();
if version_data.version.flushed_entry_id >= request.flushed_entry_id {
info!(
"Truncate region {} to {}",
region_id, request.flushed_entry_id
);
request.on_failure(RegionTruncatingSnafu { region_id }.build());
return;
if let Some(truncate_entry_id) = version_data.version.truncate_entry_id {
if truncate_entry_id >= request.flushed_entry_id {
request.on_failure(RegionTruncatingSnafu { region_id }.build());
return;
}
}

// Write region edit to manifest.
Expand Down
5 changes: 3 additions & 2 deletions src/mito2/src/worker/handle_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Notifies flush scheduler.
self.flush_scheduler.on_region_truncating(region_id);

// TODO(DevilExileSu): Consider compaction tasks during truncate.
// TODO(DevilExileSu): Notifies compaction scheduler.
let last_manifest_version = region.manifest_manager.manifest_version().await;

// Write region truncated to manifest.
let truncate = RegionTruncate {
Expand All @@ -50,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Reset region's version and mark all SSTs deleted.
region
.version_control
.reset(entry_id, &self.memtable_builder);
.truncate(entry_id, last_manifest_version, &self.memtable_builder);

// Make all data obsolete.
self.wal.obsolete(region_id, entry_id).await?;
Expand Down

0 comments on commit 3ace46e

Please sign in to comment.