From 69a79fb05ee36383bb4bd80243b39ce273b9f781 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 18 Dec 2024 21:55:30 +0800 Subject: [PATCH 1/5] feat: update partition duration of memtable using compaction window --- src/mito2/src/memtable.rs | 2 +- src/mito2/src/memtable/time_partition.rs | 32 +++++++++++++++++++++--- src/mito2/src/memtable/version.rs | 25 ++++++++++++++---- src/mito2/src/region/version.rs | 6 ++++- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6adc6eb96aec..942d77a209d2 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -70,7 +70,7 @@ impl Default for MemtableConfig { pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. estimated_bytes: usize, - /// The time range that this memtable contains. It is None if + /// The inclusive time range that this memtable contains. It is None if /// and only if the memtable is empty. time_range: Option<(Timestamp, Timestamp)>, /// Total rows in memtable diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 7fa03ae1bed4..21f0dd645977 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -168,8 +168,8 @@ impl TimePartitions { Ok(()) } - /// Forks latest partition. - pub fn fork(&self, metadata: &RegionMetadataRef) -> Self { + /// Forks latest partition and updates the partition duration. + pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option) -> Self { let mut inner = self.inner.lock().unwrap(); let latest_part = inner .parts @@ -178,17 +178,31 @@ impl TimePartitions { .cloned(); let Some(old_part) = latest_part else { + // If there is no partition, then we create a new partition with the new duration. return Self::new( metadata.clone(), self.builder.clone(), inner.next_memtable_id, - self.part_duration, + part_duration, ); }; + + let old_stats = old_part.memtable.stats(); + // Use the max timestamp to compute the new time range for the memtable. + // If `part_duration` is None, the new range will be None. + let new_time_range = + old_stats + .time_range() + .zip(part_duration) + .and_then(|(range, bucket)| { + partition_start_timestamp(range.1, bucket) + .and_then(|start| PartTimeRange::from_start_duration(start, bucket)) + }); + // Forks the latest partition, but compute the time range based on the new duration. let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata); let new_part = TimePartition { memtable, - time_range: old_part.time_range, + time_range: new_time_range, }; Self { inner: Mutex::new(PartitionsInner::with_partition( @@ -238,6 +252,16 @@ impl TimePartitions { inner.next_memtable_id } + /// Creates a new empty partition list from this list and a part_duration. + pub(crate) fn new_with_part_duration(&self, part_duration: Option) -> Self { + Self::new( + self.metadata.clone(), + self.builder.clone(), + self.next_memtable_id(), + part_duration, + ) + } + /// Returns all partitions. fn list_partitions(&self) -> PartitionVec { let inner = self.inner.lock().unwrap(); diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 1c7f2b7d4a25..78f8d1347522 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -15,6 +15,7 @@ //! Memtable version. use std::sync::Arc; +use std::time::Duration; use smallvec::SmallVec; use store_api::metadata::RegionMetadataRef; @@ -65,27 +66,41 @@ impl MemtableVersion { /// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable /// memtable. /// + /// It will switch to use the `time_window` provided. + /// /// Returns `None` if the mutable memtable is empty. pub(crate) fn freeze_mutable( &self, metadata: &RegionMetadataRef, + time_window: Option, ) -> Result> { if self.mutable.is_empty() { - // No need to freeze the mutable memtable. - return Ok(None); + // No need to freeze the mutable memtable, but we need to check the time window. + if self.mutable.part_duration() == time_window { + // If the time window is the same, we don't need to update it. + return Ok(None); + } + + // Update the time window. + let mutable = self.mutable.new_with_part_duration(time_window); + return Ok(Some(MemtableVersion { + mutable: Arc::new(mutable), + immutables: self.immutables.clone(), + })); } // Marks the mutable memtable as immutable so it can free the memory usage from our // soft limit. self.mutable.freeze()?; // Fork the memtable. - let mutable = Arc::new(self.mutable.fork(metadata)); + let mutable = Arc::new(self.mutable.fork(metadata, time_window)); - // Pushes the mutable memtable to immutable list. let mut immutables = SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions()); - self.mutable.list_memtables_to_small_vec(&mut immutables); immutables.extend(self.immutables.iter().cloned()); + // Pushes the mutable memtable to immutable list. + self.mutable.list_memtables_to_small_vec(&mut immutables); + Ok(Some(MemtableVersion { mutable, immutables, diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 8e78fa4ab351..188c314837c0 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -80,8 +80,12 @@ impl VersionControl { /// Freezes the mutable memtable if it is not empty. pub(crate) fn freeze_mutable(&self) -> Result<()> { let version = self.current().version; + let time_window = version.compaction_time_window; - let Some(new_memtables) = version.memtables.freeze_mutable(&version.metadata)? else { + let Some(new_memtables) = version + .memtables + .freeze_mutable(&version.metadata, time_window)? + else { return Ok(()); }; From 1baa533edb7774e575e2aaf225393acf105d9336 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Dec 2024 15:00:14 +0800 Subject: [PATCH 2/5] chore: only use provided duration if it is not None --- src/mito2/src/memtable/time_partition.rs | 8 ++++++-- src/mito2/src/test_util/memtable_util.rs | 5 ++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 21f0dd645977..27029c98ff35 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -168,8 +168,11 @@ impl TimePartitions { Ok(()) } - /// Forks latest partition and updates the partition duration. + /// Forks latest partition and updates the partition duration if `part_duration` is Some. pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option) -> Self { + // Fall back to the existing partition duration. + let part_duration = part_duration.or(self.part_duration); + let mut inner = self.inner.lock().unwrap(); let latest_part = inner .parts @@ -204,12 +207,13 @@ impl TimePartitions { memtable, time_range: new_time_range, }; + Self { inner: Mutex::new(PartitionsInner::with_partition( new_part, inner.next_memtable_id, )), - part_duration: self.part_duration, + part_duration, metadata: metadata.clone(), builder: self.builder.clone(), } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 1a0eacecf823..72e32c3f0ae5 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -14,7 +14,6 @@ //! Memtable test utilities. -use std::collections::BTreeMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; @@ -34,8 +33,8 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, - MemtableRanges, MemtableRef, MemtableStats, + BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, + MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; From 967d44bec4f107a592773ca05586326a2b9895d5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Dec 2024 17:07:00 +0800 Subject: [PATCH 3/5] test: more tests --- src/mito2/src/memtable/time_partition.rs | 104 +++++++++++++++++++++-- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 27029c98ff35..357507fa0852 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -256,13 +256,16 @@ impl TimePartitions { inner.next_memtable_id } - /// Creates a new empty partition list from this list and a part_duration. + /// Creates a new empty partition list from this list and a `part_duration`. + /// It falls back to the old partition duration if `part_duration` is `None`. pub(crate) fn new_with_part_duration(&self, part_duration: Option) -> Self { + debug_assert!(self.is_empty()); + Self::new( self.metadata.clone(), self.builder.clone(), self.next_memtable_id(), - part_duration, + part_duration.or(self.part_duration), ) } @@ -475,9 +478,9 @@ mod tests { assert_eq!(1, partitions.num_partitions()); assert!(!partitions.is_empty()); - assert!(!partitions.is_empty()); let mut memtables = Vec::new(); partitions.list_memtables(&mut memtables); + assert_eq!(0, memtables[0].id()); let iter = memtables[0].iter(None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); @@ -531,9 +534,7 @@ mod tests { ); } - #[test] - fn test_write_multi_parts() { - let metadata = memtable_util::metadata_for_test(); + fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions { let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5))); @@ -562,9 +563,18 @@ mod tests { partitions.write(&kvs).unwrap(); assert_eq!(2, partitions.num_partitions()); + partitions + } + + #[test] + fn test_write_multi_parts() { + let metadata = memtable_util::metadata_for_test(); + let partitions = new_multi_partitions(&metadata); + let parts = partitions.list_partitions(); let iter = parts[0].memtable.iter(None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); + assert_eq!(0, parts[0].memtable.id()); assert_eq!( Timestamp::new_millisecond(0), parts[0].time_range.unwrap().min_timestamp @@ -575,6 +585,7 @@ mod tests { ); assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]); let iter = parts[1].memtable.iter(None, None).unwrap(); + assert_eq!(1, parts[1].memtable.id()); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[5000, 7000], ×tamps[..]); assert_eq!( @@ -586,4 +597,85 @@ mod tests { parts[1].time_range.unwrap().max_timestamp ); } + + #[test] + fn test_new_with_part_duration() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); + + let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5))); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(1, new_parts.next_memtable_id()); + + // Won't update the duration if it's None. + let new_parts = new_parts.new_with_part_duration(None); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + // Don't need to create new memtables. + assert_eq!(1, new_parts.next_memtable_id()); + + let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10))); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + // Don't need to create new memtables. + assert_eq!(1, new_parts.next_memtable_id()); + + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); + // Need to build a new memtable as duration is still None. + let new_parts = partitions.new_with_part_duration(None); + assert!(new_parts.part_duration().is_none()); + assert_eq!(2, new_parts.next_memtable_id()); + } + + #[test] + fn test_fork_empty() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder, 0, None); + partitions.freeze().unwrap(); + let new_parts = partitions.fork(&metadata, None); + assert!(new_parts.part_duration().is_none()); + assert_eq!(1, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(2, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5))); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(2, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(3, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, None); + // Won't update the duration. + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(3, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(4, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10))); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + assert_eq!(4, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(5, new_parts.next_memtable_id()); + } + + #[test] + fn test_fork_non_empty_none() { + let metadata = memtable_util::metadata_for_test(); + let partitions = new_multi_partitions(&metadata); + partitions.freeze().unwrap(); + + // Won't update the duration. + let new_parts = partitions.fork(&metadata, None); + assert!(new_parts.is_empty()); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(2, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(3, new_parts.next_memtable_id()); + + // Although we don't fork a memtable multiple times, we still add a test for it. + let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10))); + assert!(new_parts.is_empty()); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + assert_eq!(3, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(4, new_parts.next_memtable_id()); + } } From 40fe7a99b11896008b01b22cecd28c424950bfb7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Dec 2024 18:24:10 +0800 Subject: [PATCH 4/5] test: test compaction apply window --- src/mito2/src/engine/compaction_test.rs | 87 +++++++++++++++++++++++++ src/mito2/src/memtable/version.rs | 12 ++++ 2 files changed, 99 insertions(+) diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index efa98e6c7240..69687fcea00a 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() { let vec = collect_stream_ts(stream).await; assert_eq!((0..20).map(|v| v * 1000).collect::>(), vec); } + +#[tokio::test] +async fn test_compaction_update_time_window() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 3 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600 + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(0, scanner.num_memtables()); + // We keep at most two files. + assert_eq!( + 2, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + + // Flush a new SST and the time window is applied. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + + // Puts window 7200. + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows_for_key("a", 3600, 4000, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); + + // Puts window 3600. + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows_for_key("a", 2400, 3600, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(2, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); +} diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 78f8d1347522..f443396109d8 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -83,6 +83,11 @@ impl MemtableVersion { // Update the time window. let mutable = self.mutable.new_with_part_duration(time_window); + common_telemetry::debug!( + "Freeze empty memtable, update partition duration from {:?} to {:?}", + self.mutable.part_duration(), + time_window + ); return Ok(Some(MemtableVersion { mutable: Arc::new(mutable), immutables: self.immutables.clone(), @@ -93,6 +98,13 @@ impl MemtableVersion { // soft limit. self.mutable.freeze()?; // Fork the memtable. + if self.mutable.part_duration() != time_window { + common_telemetry::debug!( + "Fork memtable, update partition duration from {:?}, to {:?}", + self.mutable.part_duration(), + time_window + ); + } let mutable = Arc::new(self.mutable.fork(metadata, time_window)); let mut immutables = From 3f15eb8d138a8b599a9665627ce62f94d6b29d0d Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Dec 2024 19:17:55 +0800 Subject: [PATCH 5/5] style: fix clippy --- src/mito2/src/engine/compaction_test.rs | 4 ++-- src/mito2/src/memtable/time_partition.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 69687fcea00a..fadffe50e528 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -439,7 +439,7 @@ async fn test_compaction_update_time_window() { // Puts window 7200. let rows = Rows { - schema: column_schemas.to_vec(), + schema: column_schemas.clone(), rows: build_rows_for_key("a", 3600, 4000, 0), }; put_rows(&engine, region_id, rows).await; @@ -451,7 +451,7 @@ async fn test_compaction_update_time_window() { // Puts window 3600. let rows = Rows { - schema: column_schemas.to_vec(), + schema: column_schemas.clone(), rows: build_rows_for_key("a", 2400, 3600, 0), }; put_rows(&engine, region_id, rows).await; diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 357507fa0852..052fdca9bcf8 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -541,7 +541,7 @@ mod tests { assert_eq!(0, partitions.num_partitions()); let kvs = memtable_util::build_key_values( - &metadata, + metadata, "hello".to_string(), 0, &[2000, 0], @@ -553,7 +553,7 @@ mod tests { assert!(!partitions.is_empty()); let kvs = memtable_util::build_key_values( - &metadata, + metadata, "hello".to_string(), 0, &[3000, 7000, 4000, 5000],