Skip to content

Commit

Permalink
refactor(storage): cache recent versions and unify batch query and ti…
Browse files Browse the repository at this point in the history
…me travel
  • Loading branch information
wenym1 committed Sep 10, 2024
1 parent dc07da0 commit 84154f3
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 104 deletions.
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ workspace-hack = { path = "../workspace-hack" }
bincode = "1"
criterion = { workspace = true, features = ["async_futures", "async_tokio"] }
expect-test = "1"
risingwave_hummock_sdk = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["test"] }
risingwave_test_runner = { workspace = true }
uuid = { version = "1", features = ["v4"] }

Expand Down
2 changes: 0 additions & 2 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ pub struct TracedReadOptions {
pub retention_seconds: Option<u32>,
pub table_id: TracedTableId,
pub read_version_from_backup: bool,
pub read_version_from_time_travel: bool,
}

impl TracedReadOptions {
Expand All @@ -125,7 +124,6 @@ impl TracedReadOptions {
retention_seconds: None,
table_id: TracedTableId { table_id },
read_version_from_backup: false,
read_version_from_time_travel: false,
}
}
}
Expand Down
41 changes: 26 additions & 15 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::hummock::event_handler::{
ReadOnlyRwLockRef,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::local_version::recent_versions::RecentVersions;
use crate::hummock::store::version::{
HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
};
Expand Down Expand Up @@ -197,7 +198,7 @@ pub struct HummockEventHandler {
local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
recent_versions: Arc<ArcSwap<RecentVersions>>,
write_conflict_detector: Option<Arc<ConflictDetector>>,

uploader: HummockUploader,
Expand Down Expand Up @@ -355,7 +356,10 @@ impl HummockEventHandler {
hummock_event_rx,
version_update_rx,
version_update_notifier_tx,
pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)),
recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new(
pinned_version,
60,
))),
write_conflict_detector,
read_version_mapping,
local_read_version_mapping: Default::default(),
Expand All @@ -371,8 +375,8 @@ impl HummockEventHandler {
self.version_update_notifier_tx.clone()
}

pub fn pinned_version(&self) -> Arc<ArcSwap<PinnedVersion>> {
self.pinned_version.clone()
pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
self.recent_versions.clone()
}

pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
Expand Down Expand Up @@ -629,8 +633,9 @@ impl HummockEventHandler {
.metrics
.event_handler_on_apply_version_update
.start_timer();
self.pinned_version
.store(Arc::new(new_pinned_version.clone()));
self.recent_versions.rcu(|prev_recent_versions| {
prev_recent_versions.with_new_version(new_pinned_version.clone())
});

{
self.for_each_read_version(
Expand Down Expand Up @@ -663,7 +668,10 @@ impl HummockEventHandler {
// TODO: should we change the logic when supporting partial ckpt?
if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch(
self.pinned_version.load().visible_table_committed_epoch(),
self.recent_versions
.load()
.latest_version()
.visible_table_committed_epoch(),
));
}

Expand Down Expand Up @@ -789,13 +797,13 @@ impl HummockEventHandler {
is_replicated,
vnodes,
} => {
let pinned_version = self.pinned_version.load();
let pinned_version = self.recent_versions.load().latest_version().clone();
let instance_id = self.generate_instance_id();
let basic_read_version = Arc::new(RwLock::new(
HummockReadVersion::new_with_replication_option(
table_id,
instance_id,
(**pinned_version).clone(),
pinned_version,
is_replicated,
vnodes,
),
Expand Down Expand Up @@ -992,7 +1000,7 @@ mod tests {
);

let event_tx = event_handler.event_sender();
let latest_version = event_handler.pinned_version.clone();
let latest_version = event_handler.recent_versions.clone();
let latest_version_update_tx = event_handler.version_update_notifier_tx.clone();

let send_clear = |version_id| {
Expand All @@ -1018,12 +1026,15 @@ mod tests {
let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap();
assert_eq!(old_version.version(), initial_version.version());
assert_eq!(new_version.version(), &version1);
assert_eq!(latest_version.load().version(), initial_version.version());
assert_eq!(
latest_version.load().latest_version().version(),
initial_version.version()
);

let mut changed = latest_version_update_tx.subscribe();
refill_finish_tx.send(()).unwrap();
changed.changed().await.unwrap();
assert_eq!(latest_version.load().version(), &version1);
assert_eq!(latest_version.load().latest_version().version(), &version1);
}

// test recovery with pending refill task
Expand All @@ -1050,11 +1061,11 @@ mod tests {
refill_task_rx.recv().await.unwrap();
assert_eq!(old_version3.version(), &version2);
assert_eq!(new_version3.version(), &version3);
assert_eq!(latest_version.load().version(), &version1);
assert_eq!(latest_version.load().latest_version().version(), &version1);

let rx = send_clear(version3.id);
rx.await.unwrap();
assert_eq!(latest_version.load().version(), &version3);
assert_eq!(latest_version.load().latest_version().version(), &version3);
}

async fn assert_pending(fut: &mut (impl Future + Unpin)) {
Expand All @@ -1081,7 +1092,7 @@ mod tests {
)))
.unwrap();
rx.await.unwrap();
assert_eq!(latest_version.load().version(), &version5);
assert_eq!(latest_version.load().latest_version().version(), &version5);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/local_version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod pinned_version;
pub mod recent_versions;
Loading

0 comments on commit 84154f3

Please sign in to comment.