Skip to content

Commit

Permalink
fix(storage): add seal epoch when no data (#16192)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Apr 8, 2024
1 parent a397233 commit 3ddec23
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ impl SealedData {
}

struct SyncingData {
sync_epoch: HummockEpoch,
// newer epochs come first
epochs: Vec<HummockEpoch>,
// TODO: may replace `TryJoinAll` with a future that will abort other join handles once
Expand All @@ -688,6 +687,12 @@ struct SyncingData {
table_watermarks: HashMap<TableId, TableWatermarks>,
}

impl SyncingData {
fn sync_epoch(&self) -> HummockEpoch {
*self.epochs.first().expect("non-empty")
}
}

pub struct SyncedData {
pub staging_ssts: Vec<StagingSstableInfo>,
pub table_watermarks: HashMap<TableId, TableWatermarks>,
Expand Down Expand Up @@ -872,25 +877,29 @@ impl HummockUploader {
self.max_sealed_epoch
);
self.max_sealed_epoch = epoch;
if let Some((&smallest_unsealed_epoch, _)) = self.unsealed_data.first_key_value() {
assert!(
smallest_unsealed_epoch >= epoch,
"some epoch {} older than epoch to seal {}",
smallest_unsealed_epoch,
epoch
);
if smallest_unsealed_epoch == epoch {
let (epoch, unsealed_data) = self
.unsealed_data
.pop_first()
.expect("we have checked non-empty");
self.sealed_data.seal_new_epoch(epoch, unsealed_data);
let unsealed_data =
if let Some((&smallest_unsealed_epoch, _)) = self.unsealed_data.first_key_value() {
assert!(
smallest_unsealed_epoch >= epoch,
"some epoch {} older than epoch to seal {}",
smallest_unsealed_epoch,
epoch
);
if smallest_unsealed_epoch == epoch {
let (_, unsealed_data) = self
.unsealed_data
.pop_first()
.expect("we have checked non-empty");
unsealed_data
} else {
debug!("epoch {} to seal has no data", epoch);
UnsealedEpochData::default()
}
} else {
debug!("epoch {} to seal has no data", epoch);
}
} else {
debug!("epoch {} to seal has no data", epoch);
}
UnsealedEpochData::default()
};
self.sealed_data.seal_new_epoch(epoch, unsealed_data);
}

pub(crate) fn start_merge_imms(&mut self, sealed_epoch: HummockEpoch) {
Expand Down Expand Up @@ -983,6 +992,8 @@ impl HummockUploader {
"after flush, imms must be empty"
);

assert_eq!(epoch, *epochs.front().expect("non-empty epoch"));

let try_join_all_upload_task = if uploading_tasks.is_empty() {
None
} else {
Expand All @@ -991,7 +1002,6 @@ impl HummockUploader {

self.syncing_data.push_front(SyncingData {
epochs: epochs.into_iter().collect(),
sync_epoch: epoch,
uploading_tasks: try_join_all_upload_task,
uploaded: uploaded_data,
table_watermarks,
Expand Down Expand Up @@ -1111,7 +1121,7 @@ impl HummockUploader {
.stats
.uploader_syncing_epoch_count
.set(self.syncing_data.len() as _);
let epoch = syncing_data.sync_epoch;
let epoch = syncing_data.sync_epoch();

let newly_uploaded_sstable_infos = match &result {
Ok(sstable_infos) => sstable_infos.clone(),
Expand Down Expand Up @@ -1487,7 +1497,7 @@ mod tests {
assert!(uploader.sealed_data.spilled_data.is_empty());
assert_eq!(1, uploader.syncing_data.len());
let syncing_data = uploader.syncing_data.front().unwrap();
assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch);
assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch());
assert!(syncing_data.uploaded.is_empty());
assert!(syncing_data.uploading_tasks.is_some());

Expand Down

0 comments on commit 3ddec23

Please sign in to comment.