Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 18, 2024
1 parent 826da0b commit e4b390c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
47 changes: 26 additions & 21 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::watch;
use tracing::warn;

use crate::expr::InlineNowProcTime;
use crate::meta_client::FrontendMetaClient;
Expand Down Expand Up @@ -180,35 +179,41 @@ impl HummockSnapshotManager {
}

pub fn init(&self, version: FrontendHummockVersion) {
self.worker_sender
.send(Operation::Pin(version.id, version.max_committed_epoch))
.unwrap();
// Then set the latest snapshot.
let snapshot = Arc::new(PinnedSnapshot {
value: version,
unpin_sender: self.worker_sender.clone(),
});
if self.latest_snapshot.send(snapshot).is_err() {
warn!("fail to set init version");
}
self.update_inner(|_| Some(version));
}

/// Update the latest snapshot.
///
/// Should only be called by the observer manager.
pub fn update(&self, deltas: HummockVersionDeltas) {
self.latest_snapshot.send_if_modified(move |old_snapshot| {
self.update_inner(|old_snapshot| {
if deltas.version_deltas.is_empty() {
return false;
return None;
}
let snapshot = {
let mut snapshot = old_snapshot.value.clone();
for delta in deltas.version_deltas {
snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
}
snapshot
};
let mut snapshot = old_snapshot.clone();
for delta in deltas.version_deltas {
snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
}
Some(snapshot)
})
}

fn update_inner(
&self,
get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
) {
self.latest_snapshot.send_if_modified(move |old_snapshot| {
let new_snapshot = get_new_snapshot(&old_snapshot.value);
let Some(snapshot) = new_snapshot else {
return false;
};
if snapshot.id <= old_snapshot.value.id {
assert_eq!(
snapshot.id, old_snapshot.value.id,
"receive stale frontend version"
);
return false;
}
// First tell the worker that a new snapshot is going to be pinned.
self.worker_sender
.send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch))
Expand Down
5 changes: 4 additions & 1 deletion src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ where
}

impl<T> TableChangeLogCommon<T> {
pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLogCommon<T>] {
pub fn filter_epoch(
&self,
(min_epoch, max_epoch): (u64, u64),
) -> &[EpochNewChangeLogCommon<T>] {
let start = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
});
Expand Down

0 comments on commit e4b390c

Please sign in to comment.