Skip to content

Commit

Permalink
test: trigger compaction in test
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 16, 2023
1 parent f3017f6 commit 76e6b77
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ impl CompactionScheduler {

// The region can compact directly.
let request = status.new_compaction_request(self.request_sender.clone(), waiter);
// Mark the region as compacting.
status.compacting = true;
self.schedule_compaction_request(request)
}

Expand Down
2 changes: 0 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ impl Picker for TwcsPicker {
let outputs = self.build_output(&windows, active_window, time_window_size);

if outputs.is_empty() && expired_ssts.is_empty() {
// FIXME(yingwen): Need to remove the region from the scheduler.

// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(Output::AffectedRows(0)));
Expand Down
16 changes: 11 additions & 5 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,27 @@ async fn test_compaction_region() {
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 25..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;

let output = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));

let stream = engine
.handle_query(region_id, ScanRequest::default())
.await
.unwrap();
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();

let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ impl Scanner {
Scanner::Seq(seq_scan) => seq_scan.num_memtables(),
}
}

/// Returns SST file ids to scan.
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.file_ids(),
}
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,9 @@ impl SeqScan {
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}

/// Returns SST file ids to scan.
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
}

0 comments on commit 76e6b77

Please sign in to comment.