Skip to content

Commit

Permalink
chore: CR, consider sequence number
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 12, 2023
1 parent 7b3a175 commit a259342
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async fn test_engine_truncate_during_flush() {
let flushed_entry_id = region.version_control.current().last_entry_id;

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.truncated_entry_id, None);

// Truncate the region.
engine
Expand Down Expand Up @@ -364,5 +364,5 @@ async fn test_engine_truncate_during_flush() {
.unwrap();

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncate_entry_id, None);
assert_eq!(current_version.truncated_entry_id, None);
}
4 changes: 3 additions & 1 deletion src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct RegionRemove {
pub struct RegionTruncate {
pub region_id: RegionId,
pub truncated_entry_id: EntryId,
pub truncated_sequence: SequenceNumber,
}

/// The region manifest data.
Expand Down Expand Up @@ -128,7 +129,8 @@ impl RegionManifestBuilder {

pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
self.manifest_version = manifest_version;
self.flushed_entry_id = truncate.flushed_entry_id;
self.flushed_entry_id = truncate.truncated_entry_id;
self.flushed_sequence = truncate.truncated_sequence;
self.files.clear();
}

Expand Down
26 changes: 16 additions & 10 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,20 @@ impl VersionControl {
}

/// Truncate current version.
pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) {
pub(crate) fn truncate(
&self,
truncated_entry_id: EntryId,
truncated_sequence: SequenceNumber,
memtable_builder: &MemtableBuilderRef,
) {
let version = self.current().version;

let new_mutable = memtable_builder.build(&version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(flushed_entry_id)
.truncate_entry_id(Some(flushed_entry_id))
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
.build(),
);

Expand Down Expand Up @@ -197,7 +203,7 @@ pub(crate) struct Version {
/// Latest entry id during the truncating table.
///
/// Used to check if it is a flush task during the truncation table.
pub(crate) truncate_entry_id: Option<EntryId>,
pub(crate) truncated_entry_id: Option<EntryId>,
// TODO(yingwen): RegionOptions.
}

Expand All @@ -210,7 +216,7 @@ pub(crate) struct VersionBuilder {
ssts: SstVersionRef,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncate_entry_id: Option<EntryId>,
truncated_entry_id: Option<EntryId>,
}

impl VersionBuilder {
Expand All @@ -222,7 +228,7 @@ impl VersionBuilder {
ssts: Arc::new(SstVersion::new()),
flushed_entry_id: 0,
flushed_sequence: 0,
truncate_entry_id: None,
truncated_entry_id: None,
}
}

Expand All @@ -234,7 +240,7 @@ impl VersionBuilder {
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncate_entry_id: None,
truncated_entry_id: None,
}
}

Expand Down Expand Up @@ -263,8 +269,8 @@ impl VersionBuilder {
}

/// Sets truncated entty id.
pub(crate) fn truncate_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncate_entry_id = entry_id;
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id;
self
}

Expand Down Expand Up @@ -317,7 +323,7 @@ impl VersionBuilder {
ssts: self.ssts,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncate_entry_id: self.truncate_entry_id,
truncated_entry_id: self.truncated_entry_id,
}
}
}
25 changes: 14 additions & 11 deletions src/mito2/src/worker/handle_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,32 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Try to truncate region {}", region_id);

let version_data = region.version_control.current();
let entry_id = version_data.last_entry_id;

// Notifies flush scheduler.
self.flush_scheduler.on_region_truncating(region_id);

// TODO(DevilExileSu): Notifies compaction scheduler.
let truncated_entry_id = version_data.last_entry_id;
let truncated_sequence = version_data.committed_sequence;

// Write region truncated to manifest.
let truncate = RegionTruncate {
region_id,
flushed_entry_id: entry_id,
truncated_entry_id,
truncated_sequence,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
region.manifest_manager.update(action_list).await?;

// Notifies flush scheduler.
self.flush_scheduler.on_region_truncating(region_id);
// TODO(DevilExileSu): Notifies compaction scheduler.

// Reset region's version and mark all SSTs deleted.
region
.version_control
.truncate(entry_id, &self.memtable_builder);
region.version_control.truncate(
truncated_entry_id,
truncated_sequence,
&self.memtable_builder,
);

// Make all data obsolete.
self.wal.obsolete(region_id, entry_id).await?;
self.wal.obsolete(region_id, truncated_entry_id).await?;
info!("Done truncate");

Ok(Output::AffectedRows(0))
Expand Down

0 comments on commit a259342

Please sign in to comment.