From 40fe7a99b11896008b01b22cecd28c424950bfb7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Dec 2024 18:24:10 +0800 Subject: [PATCH] test: test compaction apply window --- src/mito2/src/engine/compaction_test.rs | 87 +++++++++++++++++++++++++ src/mito2/src/memtable/version.rs | 12 ++++ 2 files changed, 99 insertions(+) diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index efa98e6c7240..69687fcea00a 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() { let vec = collect_stream_ts(stream).await; assert_eq!((0..20).map(|v| v * 1000).collect::>(), vec); } + +#[tokio::test] +async fn test_compaction_update_time_window() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 3 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600 + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(0, scanner.num_memtables()); + // We keep at most two files. + assert_eq!( + 2, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + + // Flush a new SST and the time window is applied. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + + // Puts window 7200. + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows_for_key("a", 3600, 4000, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); + + // Puts window 3600. + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows_for_key("a", 2400, 3600, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(2, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); +} diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 78f8d1347522..f443396109d8 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -83,6 +83,11 @@ impl MemtableVersion { // Update the time window. let mutable = self.mutable.new_with_part_duration(time_window); + common_telemetry::debug!( + "Freeze empty memtable, update partition duration from {:?} to {:?}", + self.mutable.part_duration(), + time_window + ); return Ok(Some(MemtableVersion { mutable: Arc::new(mutable), immutables: self.immutables.clone(), @@ -93,6 +98,13 @@ impl MemtableVersion { // soft limit. self.mutable.freeze()?; // Fork the memtable. + if self.mutable.part_duration() != time_window { + common_telemetry::debug!( + "Fork memtable, update partition duration from {:?}, to {:?}", + self.mutable.part_duration(), + time_window + ); + } let mutable = Arc::new(self.mutable.fork(metadata, time_window)); let mut immutables =