Skip to content

Commit

Permalink
wait for mce comes up in recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 31, 2024
1 parent d852cf8 commit 1a76c8e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
43 changes: 43 additions & 0 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,49 @@ impl HummockEventHandler {
);
self.uploader.clear();

if let Some(CacheRefillerEvent {
pinned_version,
new_pinned_version,
}) = self.refiller.clear()
{
self.apply_version_update(pinned_version, new_pinned_version);
}

let mce = self.uploader.max_committed_epoch();

if mce < prev_epoch {
while self
.refiller
.last_new_pinned_version()
.map(|version| version.max_committed_epoch())
.unwrap_or(mce)
< prev_epoch
{
let version_update = self
.version_update_rx
.recv()
.await
.expect("should not be empty");
self.handle_version_update(version_update);
}
let CacheRefillerEvent {
pinned_version,
new_pinned_version,
} = self
.refiller
.clear()
.expect("must have some version update to raise the mce");
self.apply_version_update(pinned_version, new_pinned_version);
}

assert!(self.uploader.max_committed_epoch() >= prev_epoch);
if self.uploader.max_committed_epoch() > prev_epoch {
warn!(
mce = self.uploader.max_committed_epoch(),
prev_epoch, "mce higher than clear prev_epoch"
);
}

for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) {
send_sync_result(
result_sender,
Expand Down
11 changes: 11 additions & 0 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ impl CacheRefiller {
pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
self.queue.back().map(|item| &item.event.new_pinned_version)
}

pub fn clear(&mut self) -> Option<CacheRefillerEvent> {
let Some(last_item) = self.queue.pop_back() else {
return None;
};
let mut event = last_item.event;
while let Some(item) = self.queue.pop_back() {
event.pinned_version = item.event.pinned_version;
}
Some(event)
}
}

impl CacheRefiller {
Expand Down

0 comments on commit 1a76c8e

Please sign in to comment.