Skip to content

Commit

Permalink
chore: remove useless changes
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 12, 2023
1 parent 231ff08 commit af959e0
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 131 deletions.
4 changes: 0 additions & 4 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use snafu::ResultExt;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -172,7 +171,6 @@ impl Picker for TwcsPicker {
request_sender,
sender: waiter,
file_purger,
last_truncate_manifest_version: current_version.last_truncate_manifest_version,
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -231,7 +229,6 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: Option<Sender<error::Result<Output>>>,
pub last_truncate_manifest_version: Option<ManifestVersion>,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -358,7 +355,6 @@ impl CompactionTask for TwcsCompactionTask {
compacted_files: deleted,
sender: self.sender.take(),
file_purger: self.file_purger.clone(),
last_truncate_manifest_version: self.last_truncate_manifest_version,
})
}
Err(e) => {
Expand Down
79 changes: 3 additions & 76 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::oneshot;

use super::ScanRequest;
use crate::config::MitoConfig;
use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest};
use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest};
use crate::sst::file::{FileId, FileMeta, FileTimeRange};
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

Expand Down Expand Up @@ -103,7 +103,7 @@ async fn test_engine_put_data_after_truncate() {
};
put_rows(&engine, region_id, rows).await;

// Scan the region.mut
// Scan the region
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -316,7 +316,6 @@ async fn test_engine_truncate_during_flush() {

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.last_truncate_manifest_version, None);

// Truncate the region.
engine
Expand All @@ -329,6 +328,7 @@ async fn test_engine_truncate_during_flush() {
region_id,
file_metas: vec![file_meta.clone()],
flushed_entry_id,
flushed_sequence: flushed_entry_id,
memtables_to_remove: SmallVec::new(),
file_purger: region.file_purger.clone(),
senders: vec![sender],
Expand Down Expand Up @@ -365,77 +365,4 @@ async fn test_engine_truncate_during_flush() {

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.last_truncate_manifest_version, Some(0));
}

#[tokio::test]
async fn test_engine_truncate_during_compaction() {
let mut env = TestEnv::with_prefix("truncate-during-compaction");
let engine = env.create_engine(MitoConfig::default()).await;

// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();

// Create a parquet file.
// Simulate that the `handle_compaction()` function is currently being executed.
let file_id = FileId::random();
let file_name = format!("{}.parquet", file_id);
let file_meta = FileMeta {
region_id,
file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
};
env.get_object_store()
.unwrap()
.write(&join_path(&region_dir, &file_name), vec![])
.await
.unwrap();

let (sender, receiver) = oneshot::channel();

// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();

// The compaction task is finished, and the `handle_compaction_finished()` is executed.
let finished = CompactionFinished {
region_id,
compaction_outputs: vec![file_meta],
compacted_files: vec![],
sender: Some(sender),
file_purger: region.file_purger.clone(),
last_truncate_manifest_version: None,
};

let worker_request = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::CompactionFinished(finished),
};

engine
.handle_worker_request(region_id, worker_request)
.await
.unwrap();

let _ = receiver.await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());

let current_version = region.version_control.current().version;
assert_eq!(current_version.last_truncate_manifest_version, Some(0));
}
5 changes: 0 additions & 5 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ impl RegionManifestManager {
inner.manifest.clone()
}

pub async fn manifest_version(&self) -> ManifestVersion {
let inner = self.inner.read().await;
inner.last_version
}

#[cfg(test)]
pub async fn store(&self) -> ManifestObjectStore {
let inner = self.inner.read().await;
Expand Down
26 changes: 1 addition & 25 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
use std::sync::{Arc, RwLock};

use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;

Expand Down Expand Up @@ -141,20 +140,14 @@ impl VersionControl {
}

/// Truncate current version.
pub(crate) fn truncate(
&self,
flushed_entry_id: u64,
manifest_version: ManifestVersion,
memtable_builder: &MemtableBuilderRef,
) {
pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;

let new_mutable = memtable_builder.build(&version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(flushed_entry_id)
.truncate_entry_id(Some(flushed_entry_id))
.last_truncate_manifest_version(Some(manifest_version))
.build(),
);

Expand Down Expand Up @@ -205,10 +198,6 @@ pub(crate) struct Version {
///
/// Used to check if it is a flush task during the truncation table.
pub(crate) truncate_entry_id: Option<EntryId>,
/// Last truncate table `ManifestVersion`
///
/// Used to check if it is a compaction task during the truncation table.
pub(crate) last_truncate_manifest_version: Option<ManifestVersion>,
// TODO(yingwen): RegionOptions.
}

Expand All @@ -222,7 +211,6 @@ pub(crate) struct VersionBuilder {
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncate_entry_id: Option<EntryId>,
last_truncate_manifest_version: Option<ManifestVersion>,
}

impl VersionBuilder {
Expand All @@ -235,7 +223,6 @@ impl VersionBuilder {
flushed_entry_id: 0,
flushed_sequence: 0,
truncate_entry_id: None,
last_truncate_manifest_version: None,
}
}

Expand All @@ -248,7 +235,6 @@ impl VersionBuilder {
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncate_entry_id: None,
last_truncate_manifest_version: version.last_truncate_manifest_version,
}
}

Expand Down Expand Up @@ -282,15 +268,6 @@ impl VersionBuilder {
self
}

/// Sets last truncate manifest version.
pub(crate) fn last_truncate_manifest_version(
mut self,
manifest_version: Option<ManifestVersion>,
) -> Self {
self.last_truncate_manifest_version = manifest_version;
self
}

/// Apply edit to the builder.
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(entry_id) = edit.flushed_entry_id {
Expand Down Expand Up @@ -341,7 +318,6 @@ impl VersionBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncate_entry_id: self.truncate_entry_id,
last_truncate_manifest_version: self.last_truncate_manifest_version,
}
}
}
3 changes: 0 additions & 3 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datatypes::prelude::DataType;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
Expand Down Expand Up @@ -588,8 +587,6 @@ pub(crate) struct CompactionFinished {
pub(crate) sender: Option<oneshot::Sender<Result<Output>>>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
/// Last truncate `ManifestVersion` before requesting compaction task.
pub(crate) last_truncate_manifest_version: Option<ManifestVersion>,
}

impl CompactionFinished {
Expand Down
5 changes: 0 additions & 5 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Utilities for testing.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -196,10 +195,6 @@ impl TestEnv {
RegionManifestManager::open(manifest_opts).await
}
}

pub fn get_data_path(&self) -> PathBuf {
self.data_home.path().join("data")
}
}

/// Builder to mock a [RegionCreateRequest].
Expand Down
12 changes: 1 addition & 11 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use store_api::storage::RegionId;
use tokio::sync::oneshot;

use crate::compaction::CompactionRequest;
use crate::error::{RegionNotFoundSnafu, RegionTruncatingSnafu, Result};
use crate::error::{RegionNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished};
Expand Down Expand Up @@ -59,16 +59,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};

let version_data = region.version_control.current();
if version_data
.version
.last_truncate_manifest_version
.ne(&request.last_truncate_manifest_version)
{
request.on_failure(RegionTruncatingSnafu { region_id }.build());
return;
}

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/worker/handle_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.flush_scheduler.on_region_truncating(region_id);

// TODO(DevilExileSu): Notifies compaction scheduler.
let last_manifest_version = region.manifest_manager.manifest_version().await;

// Write region truncated to manifest.
let truncate = RegionTruncate {
Expand All @@ -51,7 +50,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Reset region's version and mark all SSTs deleted.
region
.version_control
.truncate(entry_id, last_manifest_version, &self.memtable_builder);
.truncate(entry_id, &self.memtable_builder);

// Make all data obsolete.
self.wal.obsolete(region_id, entry_id).await?;
Expand Down

0 comments on commit af959e0

Please sign in to comment.