Skip to content

Commit

Permalink
feat: update partition duration of memtable using compaction window
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Dec 18, 2024
1 parent 548e198 commit 69a79fb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for MemtableConfig {
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains. It is None if
/// The inclusive time range that this memtable contains. It is None if
/// and only if the memtable is empty.
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
Expand Down
32 changes: 28 additions & 4 deletions src/mito2/src/memtable/time_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ impl TimePartitions {
Ok(())
}

/// Forks latest partition.
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
/// Forks latest partition and updates the partition duration.
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
let mut inner = self.inner.lock().unwrap();
let latest_part = inner
.parts
Expand All @@ -178,17 +178,31 @@ impl TimePartitions {
.cloned();

let Some(old_part) = latest_part else {
// If there is no partition, then we create a new partition with the new duration.
return Self::new(
metadata.clone(),
self.builder.clone(),
inner.next_memtable_id,
self.part_duration,
part_duration,
);
};

let old_stats = old_part.memtable.stats();
// Use the max timestamp to compute the new time range for the memtable.
// If `part_duration` is None, the new range will be None.
let new_time_range =
old_stats
.time_range()
.zip(part_duration)
.and_then(|(range, bucket)| {
partition_start_timestamp(range.1, bucket)
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
});
// Forks the latest partition, but compute the time range based on the new duration.
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
let new_part = TimePartition {
memtable,
time_range: old_part.time_range,
time_range: new_time_range,
};
Self {
inner: Mutex::new(PartitionsInner::with_partition(
Expand Down Expand Up @@ -238,6 +252,16 @@ impl TimePartitions {
inner.next_memtable_id
}

/// Creates a new empty partition list from this list and a part_duration.
pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
Self::new(
self.metadata.clone(),
self.builder.clone(),
self.next_memtable_id(),
part_duration,
)
}

/// Returns all partitions.
fn list_partitions(&self) -> PartitionVec {
let inner = self.inner.lock().unwrap();
Expand Down
25 changes: 20 additions & 5 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtable version.
use std::sync::Arc;
use std::time::Duration;

use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -65,27 +66,41 @@ impl MemtableVersion {
/// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
/// memtable.
///
/// It will switch to use the `time_window` provided.
///
/// Returns `None` if the mutable memtable is empty.
pub(crate) fn freeze_mutable(
&self,
metadata: &RegionMetadataRef,
time_window: Option<Duration>,
) -> Result<Option<MemtableVersion>> {
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return Ok(None);
// No need to freeze the mutable memtable, but we need to check the time window.
if self.mutable.part_duration() == time_window {
// If the time window is the same, we don't need to update it.
return Ok(None);
}

// Update the time window.
let mutable = self.mutable.new_with_part_duration(time_window);
return Ok(Some(MemtableVersion {
mutable: Arc::new(mutable),
immutables: self.immutables.clone(),
}));
}

// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.freeze()?;
// Fork the memtable.
let mutable = Arc::new(self.mutable.fork(metadata));
let mutable = Arc::new(self.mutable.fork(metadata, time_window));

// Pushes the mutable memtable to immutable list.
let mut immutables =
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables_to_small_vec(&mut immutables);
immutables.extend(self.immutables.iter().cloned());
// Pushes the mutable memtable to immutable list.
self.mutable.list_memtables_to_small_vec(&mut immutables);

Ok(Some(MemtableVersion {
mutable,
immutables,
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ impl VersionControl {
/// Freezes the mutable memtable if it is not empty.
pub(crate) fn freeze_mutable(&self) -> Result<()> {
let version = self.current().version;
let time_window = version.compaction_time_window;

let Some(new_memtables) = version.memtables.freeze_mutable(&version.metadata)? else {
let Some(new_memtables) = version
.memtables
.freeze_mutable(&version.metadata, time_window)?
else {
return Ok(());
};

Expand Down

0 comments on commit 69a79fb

Please sign in to comment.