Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 1, 2024
1 parent 1a76c8e commit 184bb9a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 38 deletions.
104 changes: 68 additions & 36 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arc_swap::ArcSwap;
use await_tree::InstrumentAwait;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use thiserror_ext::AsReport;
use tokio::spawn;
Expand Down Expand Up @@ -401,46 +402,61 @@ impl HummockEventHandler {

async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) {
info!(
"handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}",
self.uploader.max_committed_epoch(),
self.uploader.max_synced_epoch(),
self.uploader.max_sealed_epoch(),
prev_epoch,
max_committed_epoch = self.uploader.max_committed_epoch(),
max_synced_epoch = self.uploader.max_synced_epoch(),
max_sealed_epoch = self.uploader.max_sealed_epoch(),
"handle clear event"
);
self.uploader.clear();

if let Some(CacheRefillerEvent {
pinned_version,
new_pinned_version,
}) = self.refiller.clear()
{
self.apply_version_update(pinned_version, new_pinned_version);
}
let current_version = self.uploader.hummock_version();

let mce = self.uploader.max_committed_epoch();
if current_version.max_committed_epoch() < prev_epoch {
let mut latest_version = if let Some(CacheRefillerEvent {
pinned_version,
new_pinned_version,
}) = self.refiller.clear()
{
assert_eq!(
current_version.id(),
pinned_version.id(),
"refiller earliest version {:?} not equal to current version {:?}",
pinned_version.version(),
current_version.version()
);

if mce < prev_epoch {
while self
.refiller
.last_new_pinned_version()
.map(|version| version.max_committed_epoch())
.unwrap_or(mce)
< prev_epoch
info!(
prev_epoch,
current_mce = current_version.max_committed_epoch(),
refiller_mce = new_pinned_version.max_committed_epoch(),
"refiller is clear in recovery"
);

Some(new_pinned_version)
} else {
None
};

while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version)
&& latest_version_ref.max_committed_epoch() < prev_epoch
{
let version_update = self
.version_update_rx
.recv()
.await
.expect("should not be empty");
self.handle_version_update(version_update);
latest_version = Some(Self::resolve_version_update_info(
latest_version_ref.clone(),
version_update,
None,
));
}
let CacheRefillerEvent {
pinned_version,
new_pinned_version,
} = self
.refiller
.clear()
.expect("must have some version update to raise the mce");
self.apply_version_update(pinned_version, new_pinned_version);

self.apply_version_update(
current_version.clone(),
latest_version.expect("must have some version update to raise the mce"),
);
}

assert!(self.uploader.max_committed_epoch() >= prev_epoch);
Expand Down Expand Up @@ -476,24 +492,43 @@ impl HummockEventHandler {
let _ = notifier.send(()).inspect_err(|e| {
error!("failed to notify completion of clear event: {:?}", e);
});

info!(prev_epoch, "clear finished");
}

fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
let pinned_version = self
.refiller
.last_new_pinned_version()
.cloned()
.map(Arc::new)
.unwrap_or_else(|| self.pinned_version.load().clone());
.unwrap_or_else(|| self.uploader.hummock_version().clone());

let mut sst_delta_infos = vec![];
let new_pinned_version = Self::resolve_version_update_info(
pinned_version.clone(),
version_payload,
Some(&mut sst_delta_infos),
);

self.refiller
.start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
}

fn resolve_version_update_info(
pinned_version: PinnedVersion,
version_payload: HummockVersionUpdate,
mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
) -> PinnedVersion {
let newly_pinned_version = match version_payload {
HummockVersionUpdate::VersionDeltas(version_deltas) => {
let mut version_to_apply = pinned_version.version().clone();
for version_delta in &version_deltas {
assert_eq!(version_to_apply.id, version_delta.prev_id);
if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch {
sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta);
if let Some(sst_delta_infos) = &mut sst_delta_infos {
**sst_delta_infos =
version_to_apply.build_sst_delta_infos(version_delta);
}
}
version_to_apply.apply_version_delta(version_delta);
}
Expand All @@ -505,15 +540,12 @@ impl HummockEventHandler {

validate_table_key_range(&newly_pinned_version);

let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version);

self.refiller
.start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
pinned_version.new_pin_version(newly_pinned_version)
}

fn apply_version_update(
&mut self,
pinned_version: Arc<PinnedVersion>,
pinned_version: PinnedVersion,
new_pinned_version: PinnedVersion,
) {
self.pinned_version
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl CacheRefiller {
pub fn start_cache_refill(
&mut self,
deltas: Vec<SstDeltaInfo>,
pinned_version: Arc<PinnedVersion>,
pinned_version: PinnedVersion,
new_pinned_version: PinnedVersion,
) {
let task = CacheRefillTask {
Expand Down Expand Up @@ -296,7 +296,7 @@ impl CacheRefiller {
}

pub struct CacheRefillerEvent {
pub pinned_version: Arc<PinnedVersion>,
pub pinned_version: PinnedVersion,
pub new_pinned_version: PinnedVersion,
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ impl HummockUploader {
self.context.pinned_version.max_committed_epoch()
}

pub(crate) fn hummock_version(&self) -> &PinnedVersion {
&self.context.pinned_version
}

pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> {
assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch);
self.synced_data.get(&epoch)
Expand Down

0 comments on commit 184bb9a

Please sign in to comment.