diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6adc6eb96aec..942d77a209d2 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -70,7 +70,7 @@ impl Default for MemtableConfig { pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. estimated_bytes: usize, - /// The time range that this memtable contains. It is None if + /// The inclusive time range that this memtable contains. It is None if /// and only if the memtable is empty. time_range: Option<(Timestamp, Timestamp)>, /// Total rows in memtable diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 7fa03ae1bed4..21f0dd645977 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -168,8 +168,8 @@ impl TimePartitions { Ok(()) } - /// Forks latest partition. - pub fn fork(&self, metadata: &RegionMetadataRef) -> Self { + /// Forks latest partition and updates the partition duration. + pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option) -> Self { let mut inner = self.inner.lock().unwrap(); let latest_part = inner .parts @@ -178,17 +178,31 @@ impl TimePartitions { .cloned(); let Some(old_part) = latest_part else { + // If there is no partition, then we create a new partition with the new duration. return Self::new( metadata.clone(), self.builder.clone(), inner.next_memtable_id, - self.part_duration, + part_duration, ); }; + + let old_stats = old_part.memtable.stats(); + // Use the max timestamp to compute the new time range for the memtable. + // If `part_duration` is None, the new range will be None. + let new_time_range = + old_stats + .time_range() + .zip(part_duration) + .and_then(|(range, bucket)| { + partition_start_timestamp(range.1, bucket) + .and_then(|start| PartTimeRange::from_start_duration(start, bucket)) + }); + // Forks the latest partition, but compute the time range based on the new duration. let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata); let new_part = TimePartition { memtable, - time_range: old_part.time_range, + time_range: new_time_range, }; Self { inner: Mutex::new(PartitionsInner::with_partition( @@ -238,6 +252,16 @@ impl TimePartitions { inner.next_memtable_id } + /// Creates a new empty partition list from this list and a part_duration. + pub(crate) fn new_with_part_duration(&self, part_duration: Option) -> Self { + Self::new( + self.metadata.clone(), + self.builder.clone(), + self.next_memtable_id(), + part_duration, + ) + } + /// Returns all partitions. fn list_partitions(&self) -> PartitionVec { let inner = self.inner.lock().unwrap(); diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 1c7f2b7d4a25..78f8d1347522 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -15,6 +15,7 @@ //! Memtable version. use std::sync::Arc; +use std::time::Duration; use smallvec::SmallVec; use store_api::metadata::RegionMetadataRef; @@ -65,27 +66,41 @@ impl MemtableVersion { /// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable /// memtable. /// + /// It will switch to use the `time_window` provided. + /// /// Returns `None` if the mutable memtable is empty. pub(crate) fn freeze_mutable( &self, metadata: &RegionMetadataRef, + time_window: Option, ) -> Result> { if self.mutable.is_empty() { - // No need to freeze the mutable memtable. - return Ok(None); + // No need to freeze the mutable memtable, but we need to check the time window. + if self.mutable.part_duration() == time_window { + // If the time window is the same, we don't need to update it. + return Ok(None); + } + + // Update the time window. + let mutable = self.mutable.new_with_part_duration(time_window); + return Ok(Some(MemtableVersion { + mutable: Arc::new(mutable), + immutables: self.immutables.clone(), + })); } // Marks the mutable memtable as immutable so it can free the memory usage from our // soft limit. self.mutable.freeze()?; // Fork the memtable. - let mutable = Arc::new(self.mutable.fork(metadata)); + let mutable = Arc::new(self.mutable.fork(metadata, time_window)); - // Pushes the mutable memtable to immutable list. let mut immutables = SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions()); - self.mutable.list_memtables_to_small_vec(&mut immutables); immutables.extend(self.immutables.iter().cloned()); + // Pushes the mutable memtable to immutable list. + self.mutable.list_memtables_to_small_vec(&mut immutables); + Ok(Some(MemtableVersion { mutable, immutables, diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 8e78fa4ab351..188c314837c0 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -80,8 +80,12 @@ impl VersionControl { /// Freezes the mutable memtable if it is not empty. pub(crate) fn freeze_mutable(&self) -> Result<()> { let version = self.current().version; + let time_window = version.compaction_time_window; - let Some(new_memtables) = version.memtables.freeze_mutable(&version.metadata)? else { + let Some(new_memtables) = version + .memtables + .freeze_mutable(&version.metadata, time_window)? + else { return Ok(()); };