Skip to content

Commit

Permalink
feat: update partition duration of memtable using compaction window (#…
Browse files Browse the repository at this point in the history
…5197)

* feat: update partition duration of memtable using compaction window

* chore: only use provided duration if it is not None

* test: more tests

* test: test compaction apply window

* style: fix clippy
  • Loading branch information
evenyag authored Dec 30, 2024
1 parent 89f2e15 commit 75e4f30
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 18 deletions.
87 changes: 87 additions & 0 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

#[tokio::test]
async fn test_compaction_update_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);

env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;

let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 3 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600

let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);

let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(0, scanner.num_memtables());
// We keep at most two files.
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);

// Flush a new SST and the time window is applied.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600

// Puts window 7200.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 3600, 4000, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);

// Puts window 3600.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 2400, 3600, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for MemtableConfig {
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains. It is None if
/// The inclusive time range that this memtable contains. It is None if
/// and only if the memtable is empty.
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
Expand Down
142 changes: 131 additions & 11 deletions src/mito2/src/memtable/time_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ impl TimePartitions {
Ok(())
}

/// Forks latest partition.
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
/// Forks latest partition and updates the partition duration if `part_duration` is Some.
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
// Fall back to the existing partition duration.
let part_duration = part_duration.or(self.part_duration);

let mut inner = self.inner.lock().unwrap();
let latest_part = inner
.parts
Expand All @@ -178,24 +181,39 @@ impl TimePartitions {
.cloned();

let Some(old_part) = latest_part else {
// If there is no partition, then we create a new partition with the new duration.
return Self::new(
metadata.clone(),
self.builder.clone(),
inner.next_memtable_id,
self.part_duration,
part_duration,
);
};

let old_stats = old_part.memtable.stats();
// Use the max timestamp to compute the new time range for the memtable.
// If `part_duration` is None, the new range will be None.
let new_time_range =
old_stats
.time_range()
.zip(part_duration)
.and_then(|(range, bucket)| {
partition_start_timestamp(range.1, bucket)
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
});
// Forks the latest partition, but compute the time range based on the new duration.
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
let new_part = TimePartition {
memtable,
time_range: old_part.time_range,
time_range: new_time_range,
};

Self {
inner: Mutex::new(PartitionsInner::with_partition(
new_part,
inner.next_memtable_id,
)),
part_duration: self.part_duration,
part_duration,
metadata: metadata.clone(),
builder: self.builder.clone(),
}
Expand Down Expand Up @@ -238,6 +256,19 @@ impl TimePartitions {
inner.next_memtable_id
}

/// Creates a new empty partition list from this list and a `part_duration`.
/// It falls back to the old partition duration if `part_duration` is `None`.
pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
debug_assert!(self.is_empty());

Self::new(
self.metadata.clone(),
self.builder.clone(),
self.next_memtable_id(),
part_duration.or(self.part_duration),
)
}

/// Returns all partitions.
fn list_partitions(&self) -> PartitionVec {
let inner = self.inner.lock().unwrap();
Expand Down Expand Up @@ -447,9 +478,9 @@ mod tests {

assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
assert!(!partitions.is_empty());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
assert_eq!(0, memtables[0].id());

let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
Expand Down Expand Up @@ -503,16 +534,14 @@ mod tests {
);
}

#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
assert_eq!(0, partitions.num_partitions());

let kvs = memtable_util::build_key_values(
&metadata,
metadata,
"hello".to_string(),
0,
&[2000, 0],
Expand All @@ -524,7 +553,7 @@ mod tests {
assert!(!partitions.is_empty());

let kvs = memtable_util::build_key_values(
&metadata,
metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000, 5000],
Expand All @@ -534,9 +563,18 @@ mod tests {
partitions.write(&kvs).unwrap();
assert_eq!(2, partitions.num_partitions());

partitions
}

#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
let partitions = new_multi_partitions(&metadata);

let parts = partitions.list_partitions();
let iter = parts[0].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(0, parts[0].memtable.id());
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
Expand All @@ -547,6 +585,7 @@ mod tests {
);
assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
let iter = parts[1].memtable.iter(None, None).unwrap();
assert_eq!(1, parts[1].memtable.id());
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[5000, 7000], &timestamps[..]);
assert_eq!(
Expand All @@ -558,4 +597,85 @@ mod tests {
parts[1].time_range.unwrap().max_timestamp
);
}

#[test]
fn test_new_with_part_duration() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);

let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(1, new_parts.next_memtable_id());

// Won't update the duration if it's None.
let new_parts = new_parts.new_with_part_duration(None);
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
// Don't need to create new memtables.
assert_eq!(1, new_parts.next_memtable_id());

let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
// Don't need to create new memtables.
assert_eq!(1, new_parts.next_memtable_id());

let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
// Need to build a new memtable as duration is still None.
let new_parts = partitions.new_with_part_duration(None);
assert!(new_parts.part_duration().is_none());
assert_eq!(2, new_parts.next_memtable_id());
}

#[test]
fn test_fork_empty() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
partitions.freeze().unwrap();
let new_parts = partitions.fork(&metadata, None);
assert!(new_parts.part_duration().is_none());
assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
assert_eq!(2, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
assert_eq!(3, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, None);
// Won't update the duration.
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
assert_eq!(4, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
assert_eq!(5, new_parts.next_memtable_id());
}

#[test]
fn test_fork_non_empty_none() {
let metadata = memtable_util::metadata_for_test();
let partitions = new_multi_partitions(&metadata);
partitions.freeze().unwrap();

// Won't update the duration.
let new_parts = partitions.fork(&metadata, None);
assert!(new_parts.is_empty());
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
assert_eq!(3, new_parts.next_memtable_id());

// Although we don't fork a memtable multiple times, we still add a test for it.
let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
assert!(new_parts.is_empty());
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
assert_eq!(4, new_parts.next_memtable_id());
}
}
37 changes: 32 additions & 5 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtable version.
use std::sync::Arc;
use std::time::Duration;

use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -65,27 +66,53 @@ impl MemtableVersion {
/// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
/// memtable.
///
/// It will switch to use the `time_window` provided.
///
/// Returns `None` if the mutable memtable is empty.
pub(crate) fn freeze_mutable(
&self,
metadata: &RegionMetadataRef,
time_window: Option<Duration>,
) -> Result<Option<MemtableVersion>> {
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return Ok(None);
// No need to freeze the mutable memtable, but we need to check the time window.
if self.mutable.part_duration() == time_window {
// If the time window is the same, we don't need to update it.
return Ok(None);
}

// Update the time window.
let mutable = self.mutable.new_with_part_duration(time_window);
common_telemetry::debug!(
"Freeze empty memtable, update partition duration from {:?} to {:?}",
self.mutable.part_duration(),
time_window
);
return Ok(Some(MemtableVersion {
mutable: Arc::new(mutable),
immutables: self.immutables.clone(),
}));
}

// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.freeze()?;
// Fork the memtable.
let mutable = Arc::new(self.mutable.fork(metadata));
if self.mutable.part_duration() != time_window {
common_telemetry::debug!(
"Fork memtable, update partition duration from {:?}, to {:?}",
self.mutable.part_duration(),
time_window
);
}
let mutable = Arc::new(self.mutable.fork(metadata, time_window));

// Pushes the mutable memtable to immutable list.
let mut immutables =
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables_to_small_vec(&mut immutables);
immutables.extend(self.immutables.iter().cloned());
// Pushes the mutable memtable to immutable list.
self.mutable.list_memtables_to_small_vec(&mut immutables);

Ok(Some(MemtableVersion {
mutable,
immutables,
Expand Down
Loading

0 comments on commit 75e4f30

Please sign in to comment.