diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0a2ec880d8a4..ffff9bebe4b8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -116,6 +116,8 @@ impl CompactionScheduler { // The region can compact directly. let request = status.new_compaction_request(self.request_sender.clone(), waiter); + // Mark the region as compacting. + status.compacting = true; self.schedule_compaction_request(request) } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index f02884381be5..9812e2c00f5d 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -156,8 +156,6 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { - // FIXME(yingwen): Need to remove the region from the scheduler. - // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. for waiter in waiters { waiter.send(Ok(Output::AffectedRows(0))); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9c196d78b900..9a88ecbc4989 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -121,10 +121,12 @@ async fn test_compaction_region() { .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); + // Flush 5 SSTs for compaction. put_and_flush(&engine, region_id, &column_schemas, 0..10).await; put_and_flush(&engine, region_id, &column_schemas, 10..20).await; put_and_flush(&engine, region_id, &column_schemas, 20..30).await; - delete_and_flush(&engine, region_id, &column_schemas, 25..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let output = engine .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) @@ -132,10 +134,14 @@ async fn test_compaction_region() { .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); - let stream = engine - .handle_query(region_id, ScanRequest::default()) - .await - .unwrap(); + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 1, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 752821834e94..be8b318d83ff 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -59,6 +59,13 @@ impl Scanner { Scanner::Seq(seq_scan) => seq_scan.num_memtables(), } } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + match self { + Scanner::Seq(seq_scan) => seq_scan.file_ids(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 28e5249fecc8..148cb3777142 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -156,4 +156,9 @@ impl SeqScan { pub(crate) fn num_files(&self) -> usize { self.files.len() } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + self.files.iter().map(|file| file.file_id()).collect() + } }