Skip to content

Commit

Permalink
fix(storage): fix spill table id too strict assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 18, 2024
1 parent 10220ed commit f985d6c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
7 changes: 6 additions & 1 deletion src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,12 @@ impl UploaderData {
.map(|task_id| {
let (sst, spill_table_ids) =
self.spilled_data.remove(task_id).expect("should exist");
assert_eq!(spill_table_ids, table_ids);
assert!(
spill_table_ids.is_subset(&table_ids),
"spilled tabled ids {:?} not a subset of sync table id {:?}",
spill_table_ids,
table_ids
);
sst
})
.collect();
Expand Down
16 changes: 9 additions & 7 deletions src/storage/src/hummock/event_handler/uploader/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ mod tests {
uploader.add_imm(instance_id2, imm2_4_1.clone());

// uploader state:
// table_id1:
// table_id1: table_id2:
// instance_id1_1: instance_id1_2: instance_id2
// epoch1 imm1_1_1 imm1_2_1 | imm2_1 |
// epoch2 imms1_1_2(size 3) | |
Expand Down Expand Up @@ -314,12 +314,12 @@ mod tests {
uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test());

// uploader state:
// table_id1:
// table_id1: table_id2:
// instance_id1_1: instance_id1_2: instance_id2
// epoch1 spill(imm1_1_1, imm1_2_1, size 2) | spill(imm2_1, size 1) |
// epoch2 spill(imms1_1_2, size 3) | |
// epoch3 spill(imms_1_2_3, size 4) | |
// epoch4 spill(imm1_1_4, imm1_2_4, size 2) | spill(imm2_4_1, size 1), imm2_4_2 |
// epoch4 spill(imm1_1_4, imm1_2_4, size 2) spill(imm2_4_1, size 1), imm2_4_2 |

let (sync_tx1_1, sync_rx1_1) = oneshot::channel();
uploader.start_sync_epoch(epoch1, sync_tx1_1, HashSet::from_iter([table_id1]));
Expand All @@ -339,10 +339,6 @@ mod tests {
vec![imm2_4_2.batch_id()],
)]));

let (sync_tx4, mut sync_rx4) = oneshot::channel();
uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([table_id1, table_id2]));
await_start2_4_2.await;

finish_tx2_4_1.send(()).unwrap();
finish_tx3.send(()).unwrap();
finish_tx2.send(()).unwrap();
Expand Down Expand Up @@ -399,6 +395,12 @@ mod tests {
finish_tx2_1.send(()).unwrap();
let sst = uploader.next_uploaded_sst().await;
assert_eq!(&imm_ids1_4, sst.imm_ids());

// trigger the sync after the spill task is finished and acked to cover the case
let (sync_tx4, mut sync_rx4) = oneshot::channel();
uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([table_id1, table_id2]));
await_start2_4_2.await;

let sst = uploader.next_uploaded_sst().await;
assert_eq!(&imm_ids2_1, sst.imm_ids());
let sst = uploader.next_uploaded_sst().await;
Expand Down

0 comments on commit f985d6c

Please sign in to comment.