Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update partition duration of memtable using compaction window #5197

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

#[tokio::test]
async fn test_compaction_update_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);

env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;

let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 3 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600

let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);

let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(0, scanner.num_memtables());
// We keep at most two files.
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);

// Flush a new SST and the time window is applied.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600

// Puts window 7200.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 3600, 4000, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);

// Puts window 3600.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 2400, 3600, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for MemtableConfig {
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains. It is None if
/// The inclusive time range that this memtable contains. It is None if
/// and only if the memtable is empty.
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
Expand Down
142 changes: 131 additions & 11 deletions src/mito2/src/memtable/time_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ impl TimePartitions {
Ok(())
}

/// Forks latest partition.
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
/// Forks latest partition and updates the partition duration if `part_duration` is Some.
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
// Fall back to the existing partition duration.
let part_duration = part_duration.or(self.part_duration);

let mut inner = self.inner.lock().unwrap();
let latest_part = inner
.parts
Expand All @@ -178,24 +181,39 @@ impl TimePartitions {
.cloned();

let Some(old_part) = latest_part else {
// If there is no partition, then we create a new partition with the new duration.
return Self::new(
metadata.clone(),
self.builder.clone(),
inner.next_memtable_id,
self.part_duration,
part_duration,
);
};

let old_stats = old_part.memtable.stats();
// Use the max timestamp to compute the new time range for the memtable.
// If `part_duration` is None, the new range will be None.
let new_time_range =
old_stats
.time_range()
.zip(part_duration)
.and_then(|(range, bucket)| {
partition_start_timestamp(range.1, bucket)
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
});
// Forks the latest partition, but compute the time range based on the new duration.
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
let new_part = TimePartition {
memtable,
time_range: old_part.time_range,
time_range: new_time_range,
};

Self {
inner: Mutex::new(PartitionsInner::with_partition(
new_part,
inner.next_memtable_id,
)),
part_duration: self.part_duration,
part_duration,
metadata: metadata.clone(),
builder: self.builder.clone(),
}
Expand Down Expand Up @@ -238,6 +256,19 @@ impl TimePartitions {
inner.next_memtable_id
}

/// Creates a new empty partition list from this list and a `part_duration`.
/// It falls back to the old partition duration if `part_duration` is `None`.
pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
debug_assert!(self.is_empty());

Self::new(
self.metadata.clone(),
self.builder.clone(),
self.next_memtable_id(),
part_duration.or(self.part_duration),
)
}

/// Returns all partitions.
fn list_partitions(&self) -> PartitionVec {
let inner = self.inner.lock().unwrap();
Expand Down Expand Up @@ -447,9 +478,9 @@ mod tests {

assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
assert!(!partitions.is_empty());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
assert_eq!(0, memtables[0].id());

let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
Expand Down Expand Up @@ -503,16 +534,14 @@ mod tests {
);
}

#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
assert_eq!(0, partitions.num_partitions());

let kvs = memtable_util::build_key_values(
&metadata,
metadata,
"hello".to_string(),
0,
&[2000, 0],
Expand All @@ -524,7 +553,7 @@ mod tests {
assert!(!partitions.is_empty());

let kvs = memtable_util::build_key_values(
&metadata,
metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000, 5000],
Expand All @@ -534,9 +563,18 @@ mod tests {
partitions.write(&kvs).unwrap();
assert_eq!(2, partitions.num_partitions());

partitions
}

#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
let partitions = new_multi_partitions(&metadata);

let parts = partitions.list_partitions();
let iter = parts[0].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(0, parts[0].memtable.id());
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
Expand All @@ -547,6 +585,7 @@ mod tests {
);
assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
let iter = parts[1].memtable.iter(None, None).unwrap();
assert_eq!(1, parts[1].memtable.id());
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[5000, 7000], &timestamps[..]);
assert_eq!(
Expand All @@ -558,4 +597,85 @@ mod tests {
parts[1].time_range.unwrap().max_timestamp
);
}

#[test]
fn test_new_with_part_duration() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);

let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(1, new_parts.next_memtable_id());

// Won't update the duration if it's None.
let new_parts = new_parts.new_with_part_duration(None);
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
// Don't need to create new memtables.
assert_eq!(1, new_parts.next_memtable_id());

let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
// Don't need to create new memtables.
assert_eq!(1, new_parts.next_memtable_id());

let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
// Need to build a new memtable as duration is still None.
let new_parts = partitions.new_with_part_duration(None);
assert!(new_parts.part_duration().is_none());
assert_eq!(2, new_parts.next_memtable_id());
}

#[test]
fn test_fork_empty() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
partitions.freeze().unwrap();
let new_parts = partitions.fork(&metadata, None);
assert!(new_parts.part_duration().is_none());
assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
assert_eq!(2, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
assert_eq!(3, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, None);
// Won't update the duration.
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
assert_eq!(4, new_parts.next_memtable_id());

new_parts.freeze().unwrap();
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
assert_eq!(5, new_parts.next_memtable_id());
}

#[test]
fn test_fork_non_empty_none() {
let metadata = memtable_util::metadata_for_test();
let partitions = new_multi_partitions(&metadata);
partitions.freeze().unwrap();

// Won't update the duration.
let new_parts = partitions.fork(&metadata, None);
assert!(new_parts.is_empty());
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
assert_eq!(3, new_parts.next_memtable_id());

// Although we don't fork a memtable multiple times, we still add a test for it.
let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
assert!(new_parts.is_empty());
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
assert_eq!(4, new_parts.next_memtable_id());
}
}
37 changes: 32 additions & 5 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtable version.

use std::sync::Arc;
use std::time::Duration;

use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -65,27 +66,53 @@ impl MemtableVersion {
/// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
/// memtable.
///
/// It will switch to use the `time_window` provided.
///
/// Returns `None` if the mutable memtable is empty.
pub(crate) fn freeze_mutable(
&self,
metadata: &RegionMetadataRef,
time_window: Option<Duration>,
) -> Result<Option<MemtableVersion>> {
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return Ok(None);
// No need to freeze the mutable memtable, but we need to check the time window.
if self.mutable.part_duration() == time_window {
// If the time window is the same, we don't need to update it.
return Ok(None);
}

// Update the time window.
let mutable = self.mutable.new_with_part_duration(time_window);
common_telemetry::debug!(
"Freeze empty memtable, update partition duration from {:?} to {:?}",
self.mutable.part_duration(),
time_window
);
return Ok(Some(MemtableVersion {
mutable: Arc::new(mutable),
immutables: self.immutables.clone(),
}));
}

// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.freeze()?;
// Fork the memtable.
let mutable = Arc::new(self.mutable.fork(metadata));
if self.mutable.part_duration() != time_window {
common_telemetry::debug!(
"Fork memtable, update partition duration from {:?}, to {:?}",
self.mutable.part_duration(),
time_window
);
}
let mutable = Arc::new(self.mutable.fork(metadata, time_window));

// Pushes the mutable memtable to immutable list.
let mut immutables =
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables_to_small_vec(&mut immutables);
immutables.extend(self.immutables.iter().cloned());
// Pushes the mutable memtable to immutable list.
self.mutable.list_memtables_to_small_vec(&mut immutables);

Ok(Some(MemtableVersion {
mutable,
immutables,
Expand Down
Loading
Loading