Skip to content

Commit

Permalink
refactor(storage): cache recent versions and unify batch query and ti…
Browse files Browse the repository at this point in the history
…me travel (#18477)
  • Loading branch information
wenym1 authored Sep 11, 2024
1 parent 5065bb4 commit c882bda
Show file tree
Hide file tree
Showing 16 changed files with 500 additions and 140 deletions.
4 changes: 4 additions & 0 deletions e2e_test/time_travel/syntax.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ SET QUERY_MODE TO local;
statement ok
CREATE TABLE t (k INT);

query I
SELECT * FROM t;
----

query error
SELECT * FROM t FOR SYSTEM_TIME AS OF 963716300;
----
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,9 @@ pub struct StorageConfig {
#[serde(default)]
pub prefetch_buffer_capacity_mb: Option<usize>,

#[serde(default)]
pub max_cached_recent_versions_number: Option<usize>,

/// max prefetch block number
#[serde(default = "default::storage::max_prefetch_block_number")]
pub max_prefetch_block_number: usize,
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ This page is automatically generated by `./risedev generate-example-config`
| enable_fast_compaction | | true |
| high_priority_ratio_in_percent | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead. | |
| imm_merge_threshold | The threshold for the number of immutable memtables to merge to a new imm. | 0 |
| max_cached_recent_versions_number | | |
| max_concurrent_compaction_task_number | | 16 |
| max_prefetch_block_number | max prefetch block number | 16 |
| max_preload_io_retry_times | | 3 |
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ workspace-hack = { path = "../workspace-hack" }
bincode = "1"
criterion = { workspace = true, features = ["async_futures", "async_tokio"] }
expect-test = "1"
risingwave_hummock_sdk = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["test"] }
risingwave_test_runner = { workspace = true }
uuid = { version = "1", features = ["v4"] }

Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn bench_table_watermarks(c: &mut Criterion) {
let mut pinned_version =
PinnedVersion::new(versions.pop_front().unwrap(), unbounded_channel().0);
while let Some(version) = versions.pop_front() {
pinned_version = pinned_version.new_pin_version(version);
pinned_version = pinned_version.new_pin_version(version).unwrap();
}
},
BatchSize::SmallInput,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct TracedReadOptions {
pub retention_seconds: Option<u32>,
pub table_id: TracedTableId,
pub read_version_from_backup: bool,
pub read_version_from_time_travel: bool,
pub read_committed: bool,
}

impl TracedReadOptions {
Expand All @@ -125,7 +125,7 @@ impl TracedReadOptions {
retention_seconds: None,
table_id: TracedTableId { table_id },
read_version_from_backup: false,
read_version_from_time_travel: false,
read_committed: false,
}
}
}
Expand Down
70 changes: 41 additions & 29 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::hummock::event_handler::{
ReadOnlyRwLockRef,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::local_version::recent_versions::RecentVersions;
use crate::hummock::store::version::{
HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
};
Expand Down Expand Up @@ -197,7 +198,7 @@ pub struct HummockEventHandler {
local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
recent_versions: Arc<ArcSwap<RecentVersions>>,
write_conflict_detector: Option<Arc<ConflictDetector>>,

uploader: HummockUploader,
Expand Down Expand Up @@ -355,7 +356,10 @@ impl HummockEventHandler {
hummock_event_rx,
version_update_rx,
version_update_notifier_tx,
pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)),
recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new(
pinned_version,
storage_opts.max_cached_recent_versions_number,
))),
write_conflict_detector,
read_version_mapping,
local_read_version_mapping: Default::default(),
Expand All @@ -371,8 +375,8 @@ impl HummockEventHandler {
self.version_update_notifier_tx.clone()
}

pub fn pinned_version(&self) -> Arc<ArcSwap<PinnedVersion>> {
self.pinned_version.clone()
pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
self.recent_versions.clone()
}

pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
Expand Down Expand Up @@ -529,17 +533,18 @@ impl HummockEventHandler {
.await
.expect("should not be empty");
let prev_version_id = latest_version_ref.id();
let new_version = Self::resolve_version_update_info(
if let Some(new_version) = Self::resolve_version_update_info(
latest_version_ref.clone(),
version_update,
None,
);
info!(
?prev_version_id,
new_version_id = ?new_version.id(),
"recv new version"
);
latest_version = Some(new_version);
) {
info!(
?prev_version_id,
new_version_id = ?new_version.id(),
"recv new version"
);
latest_version = Some(new_version);
}
}

self.apply_version_update(
Expand Down Expand Up @@ -582,21 +587,21 @@ impl HummockEventHandler {
.unwrap_or_else(|| self.uploader.hummock_version().clone());

let mut sst_delta_infos = vec![];
let new_pinned_version = Self::resolve_version_update_info(
if let Some(new_pinned_version) = Self::resolve_version_update_info(
pinned_version.clone(),
version_payload,
Some(&mut sst_delta_infos),
);

self.refiller
.start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
) {
self.refiller
.start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
}
}

fn resolve_version_update_info(
pinned_version: PinnedVersion,
version_payload: HummockVersionUpdate,
mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
) -> PinnedVersion {
) -> Option<PinnedVersion> {
let newly_pinned_version = match version_payload {
HummockVersionUpdate::VersionDeltas(version_deltas) => {
let mut version_to_apply = pinned_version.version().clone();
Expand Down Expand Up @@ -629,8 +634,9 @@ impl HummockEventHandler {
.metrics
.event_handler_on_apply_version_update
.start_timer();
self.pinned_version
.store(Arc::new(new_pinned_version.clone()));
self.recent_versions.rcu(|prev_recent_versions| {
prev_recent_versions.with_new_version(new_pinned_version.clone())
});

{
self.for_each_read_version(
Expand Down Expand Up @@ -663,7 +669,10 @@ impl HummockEventHandler {
// TODO: should we change the logic when supporting partial ckpt?
if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch(
self.pinned_version.load().visible_table_committed_epoch(),
self.recent_versions
.load()
.latest_version()
.visible_table_committed_epoch(),
));
}

Expand Down Expand Up @@ -789,13 +798,13 @@ impl HummockEventHandler {
is_replicated,
vnodes,
} => {
let pinned_version = self.pinned_version.load();
let pinned_version = self.recent_versions.load().latest_version().clone();
let instance_id = self.generate_instance_id();
let basic_read_version = Arc::new(RwLock::new(
HummockReadVersion::new_with_replication_option(
table_id,
instance_id,
(**pinned_version).clone(),
pinned_version,
is_replicated,
vnodes,
),
Expand Down Expand Up @@ -992,7 +1001,7 @@ mod tests {
);

let event_tx = event_handler.event_sender();
let latest_version = event_handler.pinned_version.clone();
let latest_version = event_handler.recent_versions.clone();
let latest_version_update_tx = event_handler.version_update_notifier_tx.clone();

let send_clear = |version_id| {
Expand All @@ -1018,12 +1027,15 @@ mod tests {
let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap();
assert_eq!(old_version.version(), initial_version.version());
assert_eq!(new_version.version(), &version1);
assert_eq!(latest_version.load().version(), initial_version.version());
assert_eq!(
latest_version.load().latest_version().version(),
initial_version.version()
);

let mut changed = latest_version_update_tx.subscribe();
refill_finish_tx.send(()).unwrap();
changed.changed().await.unwrap();
assert_eq!(latest_version.load().version(), &version1);
assert_eq!(latest_version.load().latest_version().version(), &version1);
}

// test recovery with pending refill task
Expand All @@ -1050,11 +1062,11 @@ mod tests {
refill_task_rx.recv().await.unwrap();
assert_eq!(old_version3.version(), &version2);
assert_eq!(new_version3.version(), &version3);
assert_eq!(latest_version.load().version(), &version1);
assert_eq!(latest_version.load().latest_version().version(), &version1);

let rx = send_clear(version3.id);
rx.await.unwrap();
assert_eq!(latest_version.load().version(), &version3);
assert_eq!(latest_version.load().latest_version().version(), &version3);
}

async fn assert_pending(fut: &mut (impl Future + Unpin)) {
Expand All @@ -1081,7 +1093,7 @@ mod tests {
)))
.unwrap();
rx.await.unwrap();
assert_eq!(latest_version.load().version(), &version5);
assert_eq!(latest_version.load().latest_version().version(), &version5);
}
}

Expand Down
29 changes: 21 additions & 8 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,8 @@ pub(crate) mod tests {
let new_pinned_version = uploader
.context
.pinned_version
.new_pin_version(test_hummock_version(epoch1));
.new_pin_version(test_hummock_version(epoch1))
.unwrap();
uploader.update_pinned_version(new_pinned_version);
assert_eq!(epoch1, uploader.max_committed_epoch());
}
Expand Down Expand Up @@ -1672,7 +1673,8 @@ pub(crate) mod tests {
let new_pinned_version = uploader
.context
.pinned_version
.new_pin_version(test_hummock_version(epoch1));
.new_pin_version(test_hummock_version(epoch1))
.unwrap();
uploader.update_pinned_version(new_pinned_version);
assert!(uploader.data().syncing_data.is_empty());
assert_eq!(epoch1, uploader.max_committed_epoch());
Expand Down Expand Up @@ -1706,7 +1708,8 @@ pub(crate) mod tests {
let new_pinned_version = uploader
.context
.pinned_version
.new_pin_version(test_hummock_version(epoch1));
.new_pin_version(test_hummock_version(epoch1))
.unwrap();
uploader.update_pinned_version(new_pinned_version);
assert!(uploader.data().syncing_data.is_empty());
assert_eq!(epoch1, uploader.max_committed_epoch());
Expand All @@ -1730,11 +1733,21 @@ pub(crate) mod tests {
let epoch4 = epoch3.next_epoch();
let epoch5 = epoch4.next_epoch();
let epoch6 = epoch5.next_epoch();
let version1 = initial_pinned_version.new_pin_version(test_hummock_version(epoch1));
let version2 = initial_pinned_version.new_pin_version(test_hummock_version(epoch2));
let version3 = initial_pinned_version.new_pin_version(test_hummock_version(epoch3));
let version4 = initial_pinned_version.new_pin_version(test_hummock_version(epoch4));
let version5 = initial_pinned_version.new_pin_version(test_hummock_version(epoch5));
let version1 = initial_pinned_version
.new_pin_version(test_hummock_version(epoch1))
.unwrap();
let version2 = initial_pinned_version
.new_pin_version(test_hummock_version(epoch2))
.unwrap();
let version3 = initial_pinned_version
.new_pin_version(test_hummock_version(epoch3))
.unwrap();
let version4 = initial_pinned_version
.new_pin_version(test_hummock_version(epoch4))
.unwrap();
let version5 = initial_pinned_version
.new_pin_version(test_hummock_version(epoch5))
.unwrap();

uploader.start_epochs_for_test([epoch6]);
uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/local_version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod pinned_version;
pub mod recent_versions;
9 changes: 6 additions & 3 deletions src/storage/src/hummock/local_version/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,25 @@ impl PinnedVersion {
}
}

pub fn new_pin_version(&self, version: HummockVersion) -> Self {
pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
assert!(
version.id >= self.version.id,
"pinning a older version {}. Current is {}",
version.id,
self.version.id
);
if version.id == self.version.id {
return None;
}
let version_id = version.id;

PinnedVersion {
Some(PinnedVersion {
version: Arc::new(version),
guard: Arc::new(PinnedVersionGuard::new(
version_id,
self.guard.pinned_version_manager_tx.clone(),
)),
}
})
}

pub fn id(&self) -> HummockVersionId {
Expand Down
Loading

0 comments on commit c882bda

Please sign in to comment.